Source code for dvid.fetch

# This code is part of dvid-tools (https://github.com/flyconnectome/dvid_tools)
# and is released under GNU GPL3

import getpass
import inspect
import os
import re
import requests
import threading
import urllib
import warnings

import trimesh as tm
import numpy as np
import pandas as pd

from io import StringIO
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from functools import lru_cache, partial
from requests.exceptions import HTTPError
from scipy.spatial.distance import cdist
from tqdm.auto import tqdm

from . import decode, meshing, utils, config

# See if navis is available
try:
    import navis
except ImportError:
    navis = None
except BaseException:
    raise

DVID_SESSIONS = {}
DEFAULT_APPNAME = "dvidtools"


__all__ = ['add_bookmarks', 'edit_annotation', 'get_adjacency', 'get_annotation',
           'get_assignment_status', 'get_available_rois',
           'get_body_position', 'get_connections',
           'get_connectivity', 'get_labels_in_area', 'get_last_mod',
           'locs_to_ids', 'get_n_synapses', 'get_roi', 'get_sparsevol',
           'get_segmentation_info', 'get_skeletons', 'get_skeleton_mutation',
           'get_synapses', 'get_user_bookmarks', 'setup', 'snap_to_body',
           'get_meshes', 'list_projects', 'get_master_node', 'get_sparsevol_size',
           'get_sizes', 'ids_exist', 'skeletonize_neuron', 'mesh_neuron']


def dvid_session(appname=DEFAULT_APPNAME, user=None):
    """Return a default requests.Session() object.

    Automatically appends the 'u' and 'app' query string parameters to every
    request. The Session object is cached, so this function will return the same
    Session object if called again from the same thread with the same arguments.
    """
    # Technically, request sessions are not threadsafe,
    # so we keep one for each thread.
    thread_id = threading.current_thread().ident
    pid = os.getpid()

    # If user not explicitly provided
    if not user:
        # Get globally defined user or fall back to system user
        user = globals().get('user', getpass.getuser())

    try:
        s = DVID_SESSIONS[(appname, user, thread_id, pid)]
    except KeyError:
        s = requests.Session()
        s.params = {'u': user, 'app': appname}
        DVID_SESSIONS[(appname, user, thread_id, pid)] = s

    return s


[docs]def setup(server=None, node=None, user=None): """Set default server, node and/or user. Parameters ---------- server : str URL to the dvid server. node : str UUID of the node to query. user : str User name. Relevant for e.g. bookmarks. """ for p, n in zip([server, node, user], ['server', 'node', 'user']): if not isinstance(p, type(None)): globals()[n] = p
def eval_param(server=None, node=None, user=None, raise_missing=True): """Parse parameters and fall back to globally defined values.""" parsed = [] for p, n in zip([server, node, user], ['server', 'node', 'user']): if isinstance(p, type(None)): parsed.append(globals().get(n, None)) else: parsed.append(p) if raise_missing and not parsed[0]: raise ValueError('Must provide `server` (and probably `node`) either ' 'explicitly or globally via `dvid.setup()`') return parsed
[docs]def get_meshes(x, fix=True, output='auto', on_error='warn', max_threads=5, progress=True, server=None, node=None): """Fetch precomputed meshes for given body ID(s). Parameters ---------- x : int | str | list thereof ID(s) of bodies for which to download meshes. Also accepts pandas DataFrames if they have a body ID column. output : "auto" | "navis" | "trimesh" | None Determines the output of this function: - auto = ``navis.MeshNeuron`` if navis is installed else ``trimesh.Trimesh`` - navis = ``navis.MeshNeuron`` - raises error if ``navis`` not installed - trimesh on_error : "warn" | "skip" | "raise" What to do if fetching a mesh throws an error. Typically this is because there is no mesh for a given body ID but it could also be a more general connection error. max_threads : int Max number of parallel queries to the dvid server. progress : bool Whether to show a progress bar or not. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- mesh trimesh.Trimesh | navis.MeshNeuron Mutation ID is attached as `mutation_id` property (is ``None`` if not available.) See Also -------- :func:`~dvid.fetch.mesh_neuron` Use this to create a mesh from scratch - e.g. if there is no precomputed mesh for a given body. """ if output == 'navis' and not navis: raise ImportError('Please install `navis`: pip3 install navis') if on_error not in ("warn", "skip", "raise"): raise ValueError('`on_error` must be either "warn", "skip" or "raise"') if max_threads < 1: raise ValueError('`max_threads` must be >= 1') if isinstance(x, pd.DataFrame): if 'bodyId' in x.columns: x = x['bodyId'].values elif 'bodyid' in x.columns: x = x['bodyid'].values else: raise ValueError('DataFrame must have "bodyId" column.') elif isinstance(x, int): x = [x] elif isinstance(x, str): x = [int(x)] # At this point we expect a list, set or array if not isinstance(x, (list, set, np.ndarray)): raise TypeError(f'Unexpected data type for body ID(s): "{type(x)}"') out = [] with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = {} for bid in x: f = executor.submit(__get_ngmesh, bid, output=output, on_error=on_error, server=server, node=node) futures[f] = bid with tqdm(desc='Fetching', total=len(x), leave=False, disable=not progress) as pbar: for f in as_completed(futures): res = f.result() # Skip neurons that caused an error if res is not None: out.append(res) pbar.update(1) if (output == 'auto' and navis) or (output == 'navis'): out = navis.NeuronList(out) return out
def __get_ngmesh(bodyid, output='auto', on_error='raise', check_mutation=True, server=None, node=None): """Load a single mesh.""" bodyid = utils.parse_bid(bodyid) server, node, user = eval_param(server, node) url = utils.make_url(server, 'api/node', node, f'{config.segmentation}_meshes', f'key/{bodyid}.ngmesh') r = dvid_session().get(url) try: r.raise_for_status() except BaseException: if on_error == 'raise': raise elif on_error == 'warn': warnings.warn(f'{bodyid}: {r.text.strip()}') return # Decode mesh m = decode.read_ngmesh(r.content) # Grab mutation ID if check_mutation: url = utils.make_url(server, 'api/node/', node, f'{config.segmentation}_meshes_mutid', f'key/{bodyid}') r = dvid_session().get(url) # This query won't work with older nodes # -> hence wrapping it in a try/except block try: r.raise_for_status() m.mutation_id = int(r.json()) except HTTPError: m.mutation_id = None except BaseException: raise if (output == 'auto' and navis) or (output == 'navis'): n = navis.MeshNeuron(m, id=bodyid, mutation_id=m.mutation_id) return n return m
[docs]def get_skeletons(x, save_to=None, output='auto', on_error='warn', check_mutation=False, max_threads=5, progress=True, server=None, node=None): """Fetch skeleton for given body ID. Parameters ---------- x : int | str | list thereof ID(s) of bodies for which to download skeletons. Also accepts pandas DataFrames if they have a body ID column. save_to : str | None, optional If provided, will save raw SWC to file. If str must be file or path. output : "auto" | "navis" | "swc" | None Determines the output of this function: - auto = ``navis.TreeNeuron`` if navis is installed else SWC table as ``pandas.DataFrame`` - navis = ``navis.TreeNeuron`` - raises error if ``navis`` not installed - swc = SWC table as ``pandas.DataFrame`` - None = no direct output - really only relevant if you want to only save the SWC to a file on_error : "warn" | "skip" | "raise" What to do if fetching a skeleton throws an error. Typically this is because there is no skeleton for a given body ID but it could also be a more general connection error. check_mutation : bool, optional If True, will check if skeleton and body are still in-sync using the mutation IDs. Will warn if mismatch found. max_threads : int Max number of parallel queries to the dvid server. progress : bool Whether to show a progress bar or not. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- SWC : pandas.DataFrame Only if ``save_to=None`` else ``True``. None If no skeleton found. See Also -------- :func:`dvid.get_skeleton_mutation` If you want to create a skeleton based on the current voxels yourself. :func:`~dvid.fetch.skeletonize_neuron` Use this to create a skeleton from scratch - e.g. if there is no precomputed skeleton for a given body. Examples -------- Fetch neuron as navis skeleton >>> dt.get_skeleton(485775679) Grab a neuron and save it directly to a file >>> dt.get_skeleton(485775679, save_to='~/Downloads/', output=None) """ if output == 'navis' and not navis: raise ImportError('Please install `navis`: pip3 install navis') if on_error not in ("warn", "skip", "raise"): raise ValueError('`on_error` must be either "warn", "skip" or "raise"') if max_threads < 1: raise ValueError('`max_threads` must be >= 1') if isinstance(x, pd.DataFrame): if 'bodyId' in x.columns: x = x['bodyId'].values elif 'bodyid' in x.columns: x = x['bodyid'].values else: raise ValueError('DataFrame must have "bodyId" column.') elif isinstance(x, int): x = [x] elif isinstance(x, str): x = [int(x)] # At this point we expect a list, set or array if not isinstance(x, (list, set, np.ndarray)): raise TypeError(f'Unexpected data type for body ID(s): "{type(x)}"') if len(x) > 1: if save_to and not os.path.isdir(save_to): raise ValueError('"save_to" must be path when loading multiple' 'multiple skeletons') out = [] with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = {} for bid in x: f = executor.submit(__get_skeleton, bid, save_to=save_to, output=output, on_error=on_error, check_mutation=check_mutation, server=server, node=node) futures[f] = bid with tqdm(desc='Fetching', total=len(x), leave=False, disable=not progress) as pbar: for f in as_completed(futures): res = f.result() pbar.update(1) if on_error == "skip" and res is None: continue out.append(res) if (output == 'auto' and navis) or (output == 'navis'): out = navis.NeuronList(out) return out
def __get_skeleton(bodyid, save_to=None, output='auto', on_error='raise', check_mutation=False, server=None, node=None): """Load a single skeleton.""" bodyid = utils.parse_bid(bodyid) server, node, user = eval_param(server, node) url = urllib.parse.urljoin(server, 'api/node/{}/{}_skeletons/key/{}_swc'.format(node, config.segmentation, bodyid)) try: r = dvid_session().get(url) r.raise_for_status() except BaseException: if on_error == 'raise': raise elif on_error == 'warn': warnings.warn(f'{bodyid}: {r.text.strip()}') return # Save raw SWC before making any changes if save_to: # Generate proper filename if necessary if os.path.isdir(save_to): save_raw_to = os.path.join(save_to, '{}.swc'.format(bodyid)) else: # ... else assume it's a file save_raw_to = save_to with open(save_raw_to, 'w') as f: f.write(r.text) # Stop here is no further output required if not output: return # Parse SWC string swc, header = utils.parse_swc_str(r.text) swc.header = header if 'mutation id' in header: swc.mutation_id = int(re.search('"mutation id": (.*?)}', header).group(1)) else: swc.mutation_id = None if check_mutation: if not getattr(swc, 'mutation_id', None): print('{} - Unable to check mutation: mutation ID not in ' 'SWC header'.format(bodyid)) else: body_mut = get_last_mod(bodyid, server=server, node=node).get('mutation id') if swc.mutation_id != body_mut: print("{}: mutation IDs of skeleton and mesh don't match. " "The skeleton might not be up-to-date.".format(bodyid)) if (output == 'auto' and navis) or (output == 'navis'): n = navis.TreeNeuron(swc, id=bodyid) n.header = header n.mutation_id = getattr(swc, 'mutation_id', None) return n return swc def __old_get_skeleton(bodyid, save_to=None, xform=None, root=None, soma=None, heal=False, check_mutation=True, server=None, node=None, verbose=True, **kwargs): """Download skeleton as SWC file. Parameters ---------- bodyid : int | str ID(s) of body for which to download skeleton. save_to : str | None, optional If provided, will save SWC to file. If str must be file or path. Please note that using ``heal`` or ``reroot`` will require the SWC table to be cleaned-up using ``dvidtools.refurbish_table`` before saving to keep it in line with SWC format. This will change node IDs! xform : function, optional If provided will run this function to transform coordinates before saving/returning the SWC file. Function must accept ``(N, 3)`` numpy array. Nodes that don't transform properly will be removed and disconnected piece will be healed. soma : array-like | function, optional Use to label ("1") and reroot to soma node: - array-like is interpreted as x/y/z position and will be mapped to the closest node - ``function`` must accept ``bodyid`` and return x/y/z array-like root : array-like | function, optional Use to reroot the neuron to given node: - array-like is interpreted as x/y/z position and will be mapped to the closest node - ``function`` must accept ``bodyid`` and return x/y/z array-like This will override ``soma``. heal : bool, optional If True, will heal fragmented neurons using ``dvidtools.heal_skeleton``. check_mutation : bool, optional If True, will check if skeleton and body are still in-sync using the mutation IDs. Will warn if mismatch found. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- SWC : pandas.DataFrame Only if ``save_to=None`` else ``True``. None If no skeleton found. Examples -------- Easy: grab a neuron and save it to a file >>> dt.get_skeleton(485775679, save_to='~/Downloads/') """ if isinstance(bodyid, (list, np.ndarray)): if save_to and not os.path.isdir(save_to): raise ValueError('"save_to" must be path when loading multiple' 'multiple bodies') resp = {x: __old_get_skeleton(x, save_to=save_to, check_mutation=check_mutation, verbose=verbose, heal=heal, soma=soma, xform=xform, server=server, node=node, **kwargs) for x in tqdm(bodyid, desc='Loading')} # Give summary missing = [str(k) for k, v in resp.items() if isinstance(v, type(None))] print('{}/{} skeletons successfully downloaded.'.format(len(resp) - len(missing), len(resp))) if missing: print('Missing skeletons for: {}'.format(','.join(missing))) if not save_to: return resp else: return bodyid = utils.parse_bid(bodyid) server, node, user = eval_param(server, node) r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}_skeletons/key/{}_swc'.format(node, config.segmentation, bodyid))) if 'not found' in r.text: if verbose: print(r.text) return None # Save raw SWC before making any changes save_raw_to = kwargs.get('save_raw_to', False) if save_raw_to: # Generate proper filename if necessary if os.path.isdir(save_raw_to): save_raw_to = os.path.join(save_raw_to, '{}.swc'.format(bodyid)) with open(save_raw_to, 'w') as f: f.write(r.text) # Parse SWC string df, header = utils.parse_swc_str(r.text) if 'mutation id' in header: df.mutation_id = int(re.search('"mutation id": (.*?)}', header).group(1)) if check_mutation: if not getattr(df, 'mutation_id', None): print('{} - Unable to check mutation: mutation ID not in ' 'SWC header'.format(bodyid)) else: body_mut = get_last_mod(bodyid, server=server, node=node).get('mutation id') if df.mutation_id != body_mut: print("{}: mutation IDs of skeleton and mesh don't match. " "The skeleton might not be up-to-date.".format(bodyid)) # Heal first as this might change node IDs if heal: utils.heal_skeleton(df, inplace=True) # If soma is function, call it if callable(soma): soma = soma(bodyid) # If root is function, call it if callable(root): root = root(bodyid) # If we have a soma if isinstance(soma, (list, tuple, np.ndarray)): # Get soma node soma_node = utils._snap_to_skeleton(df, soma) # Set label df.loc[df.node_id == soma_node, 'label'] = 1 # If root is not explicitly provided, reroot to soma if not isinstance(root, (list, tuple, np.ndarray)): root = soma # If we have a root if isinstance(root, (list, tuple, np.ndarray)): # Get soma node root_node = utils._snap_to_skeleton(df, root) # Reroot utils.reroot_skeleton(df, root_node, inplace=True) if callable(xform): # Add xform function to header for documentation header += '#x/y/z coordinates transformed by dvidtools using this:\n' header += '\n'.join(['#' + l for l in inspect.getsource(xform).split('\n') if l]) # Transform coordinates df.iloc[:, 2:5] = xform(df.iloc[:, 2:5].values) # Check if any coordinates got messed up nans = df[np.any(df.iloc[:, 2:5].isnull(), axis=1)] if not nans.empty: if verbose: print('{} nodes did not xform - removing & stitching...'.format(nans.shape[0])) # Drop nans df.drop(nans.index, inplace=True) # Keep track of existing root (if any left) root = df.loc[df.parent_id < 0, 'node_id'] root = root.values[0] if not root.empty else None # Set orphan nodes to roots df.loc[~df.parent_id.isin(df.node_id), 'parent_id'] = -1 # Heal fragments utils.heal_skeleton(df, root=root, inplace=True) elif not isinstance(xform, type(None)): raise TypeError('"xform" must be a function, not "{}"'.format(type(xform))) if save_to: # Make sure table is still conform with SWC format if heal or not isinstance(root, type(None)): df = utils.refurbish_table(df) # Generate proper filename if necessary if os.path.isdir(save_to): save_to = os.path.join(save_to, '{}.swc'.format(bodyid)) # Save SWC file utils.save_swc(df, filename=save_to, header=header) return True else: return df
[docs]def get_user_bookmarks(server=None, node=None, user=None, return_dataframe=True): """Get user bookmarks. Parameters ---------- server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. user : str, optional If not provided, will try reading from global. return_dataframe : bool, optional If True, will return pandas.DataFrame. If False, returns original json. Returns ------- bookmarks : pandas.DataFrame or json """ server, node, user = eval_param(server, node, user) r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/bookmark_annotations/tag/user:{}'.format(node, user))) if return_dataframe: data = r.json() for d in data: d.update(d.pop('Prop')) return pd.DataFrame.from_records(data) else: return r.json()
[docs]def add_bookmarks(data, verify=True, server=None, node=None): """Add or edit user bookmarks. Please note that you will have to restart neutu to see the changes to your user bookmarks. Parameters ---------- data : list of dicts Must be list of dicts. See example:: [{'Pos': [21344, 21048, 22824], 'Kind': 'Note', 'Tags': ['user:schlegelp'], 'Prop': {'body ID': '1671952694', 'comment': 'mALT', 'custom': '1', 'status': '', 'time': '', 'type': 'Other', 'user': 'schlegelp'}}, ... ] verify : bool, optional If True, will sanity check ``data`` against above example. Do not skip unless you know exactly what you're doing! server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- Nothing """ server, node, user = eval_param(server, node) # Sanity check data if not isinstance(data, list): raise TypeError('Data must be list of dicts. ' 'See help(dvidtools.add_bookmarks)') if verify: required = {'Pos': list, 'Kind': str, 'Tags': [str], 'Prop': {'body ID': str, 'comment': str, 'custom': str, 'status': str, 'time': str, 'type': str, 'user': str}} utils.verify_payload(data, required=required, required_only=True) r = dvid_session().post(urllib.parse.urljoin(server, 'api/node/{}/bookmark_annotations/elements'.format(node)), json=data) r.raise_for_status() return
[docs]def get_annotation(bodyid, server=None, node=None, verbose=True): """Fetch annotations for given body. Parameters ---------- bodyid : int | str ID of body for which to get annotations.. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. verbose : bool, optional If True, will print error if no annotation for body found. Returns ------- annotations : dict """ server, node, user = eval_param(server, node) r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}_annotations/key/{}'.format(node, config.body_labels, bodyid))) try: return r.json() except BaseException: if verbose: print(r.text) return {}
[docs]def edit_annotation(bodyid, annotation, server=None, node=None, verbose=True): """Edit annotations for given body. Parameters ---------- bodyid : int | str ID of body for which to edit annotations. annotation : dict Dictionary of new annotations. Possible fields are:: { "status": str, "comment": str, "body ID": int, "name": str, "class": str, "user": str, "naming user": str } Fields other than the above will be ignored! server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. verbose : bool, optional If True, will warn if entirely new annotations are added (as opposed to just updating existing annotations) Returns ------- None Examples -------- >>> # Get annotations for given body >>> an = dvidtools.get_annotation('1700937093') >>> # Edit field >>> an['name'] = 'New Name' >>> # Update annotation >>> dvidtools.edit_annotation('1700937093', an) """ if not isinstance(annotation, dict): raise TypeError('Annotation must be dictionary, not "{}"'.format(type(annotation))) server, node, user = eval_param(server, node) bodyid = utils.parse_bid(bodyid) # Get existing annotations old_an = get_annotation(bodyid, server=server, node=node, verbose=False) # Raise non-standard payload new_an = [k for k in annotation if k not in old_an] if new_an and verbose: warnings.warn('Adding new annotation(s) to {}: {}'.format(bodyid, ', '.join(new_an))) # Update annotations old_an.update(annotation) r = dvid_session().post(urllib.parse.urljoin(server, 'api/node/{}/{}_annotations/key/{}'.format(node, config.body_labels, bodyid)), json=old_an) # Check if it worked r.raise_for_status() return None
def __old_get_body_id(pos, server=None, node=None): """Get body ID at given position. Parameters ---------- pos : iterable [x, y, z] position to query. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- body_id : str """ server, node, user = eval_param(server, node) r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}/label/{}_{}_{}'.format(node, config.segmentation, pos[0], pos[1], pos[2]))) return r.json()['Label']
[docs]def locs_to_ids(pos, chunk_size=10e3, progress=True, server=None, node=None): """Get body IDs at given positions. Parameters ---------- pos : iterable [[x1, y1, z1], [x2, y2, z2], ..] positions in voxel space. Must be integers! chunk_size : int, optional Splits query into chunks of a given size to reduce strain on server. progress : bool If True, show progress bar. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- body_ids : np.ndarray """ server, node, user = eval_param(server, node) pos = np.asarray(pos) if pos.ndim == 1: pos.reshape(1, 3) if pos.ndim != 2 or pos.shape[1] != 3: raise ValueError(f'Expected (N, 3) array of positions, got {pos.shape}') # Make sure we are working on integers pos = pos.astype(int) data = [] with tqdm(desc='Querying positions', total=len(pos), disable=not progress, leave=False) as pbar: for ix in range(0, len(pos), int(chunk_size)): chunk = pos[ix: ix + int(chunk_size)] url = utils.make_url(server, f'api/node/{node}/{config.segmentation}/labels') r = dvid_session().get(url, json=chunk.tolist()) r.raise_for_status() data += r.json() pbar.update(len(chunk)) return np.array(data)
[docs]def get_sizes(ids, chunk_size=1e3, progress=True, server=None, node=None): """Get sizes (in supervoxels) for given body IDs. Parameters ---------- ids : iterable [12345, 455677, ...] IDs. Must be integers! chunk_size : int, optional Splits query into chunks of a given size to reduce strain on server. progress : bool If True, show progress bar. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- sizes : np.ndarray For IDs that do not exist the size will be 0. """ server, node, user = eval_param(server, node) ids = np.asarray(ids) if ids.ndim != 1: raise ValueError(f'Expected (N, ) array of IDs, got {ids.shape}') # Make sure we are working on integers ids = ids.astype(int) data = [] with tqdm(desc='Querying sizes', total=len(ids), disable=not progress, leave=False) as pbar: for ix in range(0, len(ids), int(chunk_size)): chunk = ids[ix: ix + int(chunk_size)] url = utils.make_url(server, f'api/node/{node}/{config.segmentation}/sizes') r = dvid_session().get(url, json=chunk.tolist()) r.raise_for_status() data += r.json() pbar.update(len(chunk)) return np.array(data)
[docs]def ids_exist(ids, chunk_size=1e3, progress=True, server=None, node=None): """Check if given IDs exist. Parameters ---------- ids : iterable [12345, 455677, ...] IDs. Must be integers! chunk_size : int, optional Splits query into chunks of a given size to reduce strain on server. progress : bool If True, show progress bar. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- exists : np.ndarray Array of booleans. """ server, node, user = eval_param(server, node) ids = np.asarray(ids) if ids.ndim != 1: raise ValueError(f'Expected (N, ) array of IDs, got {ids.shape}') # Make sure we are working on integers ids = ids.astype(int) data = [] with tqdm(desc='Querying', total=len(ids), disable=not progress, leave=False) as pbar: for ix in range(0, len(ids), int(chunk_size)): chunk = ids[ix: ix + int(chunk_size)] url = utils.make_url(server, f'api/node/{node}/{config.segmentation}/sizes') r = dvid_session().get(url, json=chunk.tolist()) r.raise_for_status() data += r.json() pbar.update(len(chunk)) sizes = np.array(data) return sizes != 0
[docs]def get_body_position(bodyid, server=None, node=None): """Get a single position for given body ID. This will (like neutu) use the skeleton. If body has no skeleton, will use mesh as fallback. Parameters ---------- bodyid : int | str Body ID for which to find a position. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- (x, y, z) """ bodyid = utils.parse_bid(bodyid) s = get_skeletons(bodyid, server=server, node=node) if isinstance(s, pd.DataFrame) and not s.empty: # Return the root (more likely to be actually within the mesh?) return s.loc[0, ['x', 'y', 'z']].values else: # First get voxels of the coarse neuron voxels = get_sparsevol(bodyid, scale='coarse', ret_type='INDEX', server=server, node=node) # Erode surface voxels to make sure we get a central position while True: eroded = meshing.remove_surface_voxels(voxels) # Stop before no more voxels left if eroded.size == 0: break voxels = eroded # Get voxel sizes based on scale info = get_segmentation_info(server, node)['Extended'] # Now query the more precise mesh for this coarse voxel # Pick a random voxel v = voxels[0] * info['BlockSize'] # turn into locs for scale 0 # Generate a bounding bbox bbox = np.vstack([v, v]).T bbox[:, 1] += info['BlockSize'] voxels = get_sparsevol(bodyid, scale=0, ret_type='INDEX', bbox=bbox.ravel(), server=server, node=node) # Erode surface voxels again to make sure we get a central position while True: eroded = meshing.remove_surface_voxels(voxels) # Stop before no more voxels left if eroded.size == 0: break voxels = eroded return voxels[0]
[docs]def get_assignment_status(pos, window=None, bodyid=None, server=None, node=None): """Return assignment status at given position. Checking/unchecking assigments leaves invisible "bookmarks" at the given position. These can be queried using this endpoint. Parameters ---------- pos : tuple X/Y/Z Coordinates to query. window : array-like | None, optional If provided, will return assigments in bounding box with ``pos`` in the center and ``window`` as size in x/y/z. bodyid : int | list, optional If provided, will only return assignments that are within the given body ID(s). Only relevant if ``window!=None``. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- dict E.g. ``{'checked': True}`` if assignment(s) were found at given position/in given bounding box. None If no assigments found. list If ``window!=None`` will return a list of of dicts. """ server, node, user = eval_param(server, node) if isinstance(window, (list, np.ndarray, tuple)): pos = pos if isinstance(pos, np.ndarray) else np.array(pos) pos = pos.astype(int) window = window if isinstance(window, np.ndarray) else np.array(window) r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/bookmarks/keyrange/' '{}_{}_{}/{}_{}_{}'.format(node, int(pos[0]-window[0]/2), int(pos[1]-window[1]/2), int(pos[2]-window[2]/2), int(pos[0]+window[0]/2), int(pos[1]+window[1]/2), int(pos[2]+window[2]/2),))) r.raise_for_status() # Above query returns coordinates that are in lexicographically # between key1 and key2 -> we have to filter for those inside the # bounding box ourselves coords = np.array([c.split('_') for c in r.json()]).astype(int) # If provided, make sure all coordinates in window are from given # body ID(s) if not isinstance(bodyid, type(None)): if not isinstance(bodyid, (list, np.ndarray)): bodyid = [bodyid] bids = np.array(locs_to_ids(coords, server=server, node=node )) coords = coords[np.in1d(bids, bodyid)] if coords.size == 0: return [] coords = coords[(coords > (pos - window / 2)).all(axis=1)] coords = coords[(coords < (pos + window / 2)).all(axis=1)] return [get_assignment_status(c, window=None, bodyid=bodyid, server=server, node=node) for c in coords] r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/bookmarks/key/{}_{}_{}'.format(node, int(pos[0]), int(pos[1]), int(pos[2])))) # Will raise if key not found -> so just don't # r.raise_for_status() return r.json() if r.text and 'not found' not in r.text else None
[docs]def get_labels_in_area(offset, size, server=None, node=None): """Get labels (todo, to split, etc.) in given bounding box. Parameters ---------- offset : iterable [x, y, z] position of top left corner of area. size : iterable [x, y, z] dimensions of area. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- todo tags : pandas.DataFrame """ server, node, user = eval_param(server, node) r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}_todo/elements/' '{}_{}_{}/{}_{}_{}'.format(node, config.segmentation, int(size[0]), int(size[1]), int(size[2]), int(offset[0]), int(offset[1]), int(offset[2])))) r.raise_for_status() j = r.json() if j: return pd.DataFrame.from_records(r.json()) else: return None
[docs]def get_available_rois(server=None, node=None, step_size=2): """Get a list of all available ROIs in given node. Parameters ---------- server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- list """ server, node, user = eval_param(server, node) r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/rois/keys'.format(node))) r.raise_for_status() return r.json()
[docs]def get_roi(roi, step_size=2, form='MESH', save_to=None, server=None, node=None): """Get ROI as mesh or voxels. Uses marching cube algorithm to extract surface model of ROI voxels if no precomputed mesh available. Note that some ROIs exist only as voxels or as mesh. If voxels are requested but not available, will raise "Bad Request" HttpError. Parameters ---------- roi : str Name of ROI. form : "MESH" | "VOXELS"| "BLOCKS", optional Returned format - see ``Returns``. step_size : int, optional Step size for marching cube algorithm. Only relevant for ``form="MESH"``. Smaller values = higher resolution but slower. save_to : filename If provided will also write mesh straight to file. Should end with `.obj`. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- mesh : trimesh.Trimesh If ``format=='MESH'``. Coordinates in nm. voxels : numpy array If ``format=='VOXELS'``. blocks : numpy array If ``format=='BLOCKS'``. Encode blocks of voxels as 4 coordinates: ``[z, y, x_start, x_end]`` """ if form.upper() not in ('MESH', 'VOXELS', 'BLOCKS'): raise ValueError('Unknown return format "{}"'.format(form)) server, node, user = eval_param(server, node) if form.upper() == 'MESH': # Check if we can get the OBJ file directly try: # Get the key for this roi r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/rois/key/{}'.format(node, roi))) r.raise_for_status() key = r.json()['->']['key'] # Get the obj string r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/roi_data/key/{}'.format(node, key))) r.raise_for_status() if save_to: with open(save_to, 'w') as f: f.write(r.text) return # The data returned is in .obj format return tm.load(StringIO(r.text), file_type='obj') except BaseException: if r.status_code != 400: raise # Get the voxels r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}/roi'.format(node, roi))) r.raise_for_status() # The data returned are block coordinates: [z, y, x_start, x_end] blocks = np.array(r.json()) if form.upper() == 'BLOCKS': return blocks voxels = meshing._blocks_to_voxels(blocks) if form.upper() == 'VOXELS': return voxels # Try getting voxel size meta = get_segmentation_info(node=node, server=server) if 'Extended' in meta and 'BlockSize' in meta['Extended']: voxel_size = tuple(meta['Extended']['BlockSize']) else: print('No voxel size found. Mesh returned in raw voxels.') voxel_size = (1, 1, 1) mesh = meshing.mesh_from_voxels(voxels, spacing=voxel_size, step_size=step_size) if save_to: mesh.export(save_to) return mesh
[docs]def skeletonize_neuron(bodyid, scale=4, server=None, node=None, progress=True, **kwargs): """Skeletonize given body. Fetches voxels from DVID, creates a mesh (via `mesh_neuron`) and then skeletonizes it. This can be useful if the precomputed skeletons are not up-to-date or have incorrect topology. This function requires `skeletor` to be installed:: pip3 install skeletor Parameters ---------- bodyid : int | str | trimesh ID of body for which to generate skeleton. scale : int | "COARSE" Resolution of sparse volume to use for skeletonization. Lower = higher res. Higher resolutions tend to produce more accurate but also more noisy (e.g. tiny free-floating fragments) skeletons. In my experience, `scale="COARSE"` for quick & dirty and `scale=4` for high-quality skeletons make the most sense. Scales 5 and 6 are too coarse, and below 3 becomes prohibitively slow. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. progress : bool Whether to show progress bars. **kwargs Keyword arguments are passed through to `mesh_from_voxels`. Returns ------- skeletor.Skeleton In "nm" coordinates. See Also -------- :func:`dvid.get_skeletons` To download precomputed skeletons. """ try: import skeletor as sk except ImportError: raise ImportError('`skeletonize_neuron` requires `skeletor` to be installed') except BaseException: raise defaults = dict(step_size=1) defaults.update(kwargs) # Get the sparse-vol mesh if isinstance(bodyid, tm.Trimesh): mesh = bodyid else: mesh = mesh_neuron(bodyid, scale=scale, server=server, node=node, progress=progress, **defaults) # Skeletonize return sk.skeletonize.by_wavefront(mesh, radius_agg='median', progress=progress)
[docs]def get_sparsevol_size(bodyid, server=None, node=None): """Fetch sparsevol (voxel) info for given neuron. Parameters ---------- bodyid : int | str ID of body for which to download mesh. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- dict Dict with number of voxels and coarse bounding box in voxel space. """ server, node, user = eval_param(server, node) bodyid = utils.parse_bid(bodyid) url = urllib.parse.urljoin(server, 'api/node/{}/{}/sparsevol-size/{}'.format(node, config.segmentation, bodyid)) r = dvid_session().get(url) r.raise_for_status() return r.json()
[docs]def get_sparsevol(bodyid, scale='COARSE', ret_type='INDEX', voxels=True, save_to=None, bbox=None, server=None, node=None): """Fetch sparsevol (voxel) representation for given neuron. Parameters ---------- bodyid : int | str ID of body for which to download mesh. scale : int | "COARSE", optional Resolution of sparse volume starting with 0 where each level beyond 0 has 1/2 resolution of previous level. "COARSE" will return the volume in block coordinates. save_to : str | None, optional If provided, will save response from server as binary file. ret_type : "INDEX" | "COORDS" | "RAW" "INDEX" returns x/y/z indices. "COORDS" returns x/y/z coordinates. "RAW" will return server response as raw bytes. voxels : bool If False, will return x/y/z/x_run_length instead of x/y/z voxels. bbox : list | None, optional Bounding box to which to restrict the query to. Must be in `scale=0` index coordinates. Format: ``[x_min, x_max, y_min, y_max, z_min, z_max]``. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- voxels : (N, 3) np.array If ``voxels=True``: array with x/y/z coordinates/indices rles : (N, 4) np.array If ``voxels=False``: array with x/y/z/x_run_length coordinates/indices. raw : bytes string If ``ret_type='RAW'``: server response as raw bytes. """ if ret_type.upper() not in ('COORDS', 'INDEX', 'RAW'): raise ValueError('"ret_type" must be "COORDS", "INDEX" or "RAW"') server, node, user = eval_param(server, node) bodyid = utils.parse_bid(bodyid) # Get voxel sizes based on scale info = get_segmentation_info(server, node)['Extended'] vsize = {'COARSE': [s * 8 for s in info['BlockSize']]} vsize.update({i: np.array(info['VoxelSize']) * 2**i for i in range(info['MaxDownresLevel'])}) if isinstance(scale, int) and scale > info['MaxDownresLevel']: raise ValueError('Scale greater than MaxDownresLevel') elif isinstance(scale, str): scale = scale.upper() options = {} if scale == 'COARSE': url = urllib.parse.urljoin(server, 'api/node/{}/{}/sparsevol-coarse/{}'.format(node, config.segmentation, bodyid)) elif isinstance(scale, (int, np.number)): url = urllib.parse.urljoin(server, 'api/node/{}/{}/sparsevol/{}'.format(node, config.segmentation, bodyid)) options['scale'] = scale else: raise TypeError(f'scale must be "COARSE" or integer, not "{scale}"') if not isinstance(bbox, type(None)): bbox = np.asarray(bbox).astype(int) if bbox.shape != (6, ): raise ValueError(f'Bounding box must be shape (6, ), got {bbox.shape}') for key, co in zip(['minx', 'maxx', 'miny', 'maxy', 'minz', 'maxz'], bbox): options[key] = co if options: url += '?' + '&'.join([f'{k}={v}' for k, v in options.items()]) if not isinstance(bbox, type(None)): check = dvid_session().head(url) if check.status_code != 200: raise ValueError(f'There is no sparse volume for {bodyid} ' 'within the specified bounds.') r = dvid_session().get(url) r.raise_for_status() b = r.content if save_to: with open(save_to, 'wb') as f: f.write(b) return if ret_type.upper() == 'RAW': return b # Decode binary format header, voxels = decode.decode_sparsevol(b, format='rles', voxels=voxels) if ret_type.upper() == 'COORDS': voxels = voxels[:, :3] * vsize[scale] if voxels.shape[1] == 4: voxels[:, 4] *= vsize[scale][0] return voxels
[docs]def mesh_neuron(bodyid, scale='COARSE', step_size=1, bbox=None, parallel=False, progress=True, server=None, node=None, **kwargs): """Create mesh for given neuron(s). Parameters ---------- bodyid : int | str | list-like Body ID(s) for which to generate mesh. scale : int | "COARSE", optional Resolution of sparse volume starting with 0 where each level beyond 0 has 1/2 resolution of previous level. "COARSE" will return the volume in block coordinates. step_size : int, optional Step size for marching cube algorithm. Higher values = faster but coarser. bbox : list | None, optional Bounding box to which to restrict the meshing to. Must be in `scale=0` coordinates. Format: ``[x_min, x_max, y_min, y_max, z_min, z_max]``. parallel : bool | int Whether to run meshing in parallel on multiple cores if `bodyid` is more than one neuron. If `parallel` is integer will use that many cores. Otherwise defaults to half the available cores. progress : bool Whether to show a progress bar when meshing multiple neurons. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. **kwargs Keyword arguments are passed through to `dv.meshing.mesh_from_voxels`. Returns ------- trimesh.Trimesh See Also -------- :func:`~dvid.fetch.get_meshes` Use this to fetch precomputed meshes instead of making our own. """ server, node, user = eval_param(server, node) if isinstance(bodyid, (list, tuple, np.ndarray)): if len(bodyid) == 1: bodyid = bodyid[0] else: func = partial(mesh_neuron, scale=scale, step_size=step_size, bbox=bbox, server=server, node=node, **kwargs) if not parallel: return [func(b) for b in tqdm(bodyid, desc='Meshing', disable=not progress, leave=False)] else: meshes = [] n_cores = parallel if isinstance(parallel, int) else max(1, int(os.n_cores() // 2)) with ProcessPoolExecutor(max_workers=n_cores) as executor: futures = {} for bid in bodyid: f = executor.submit(func, bid) futures[f] = bid with tqdm(desc='Meshing', total=len(bodyid), leave=False, disable=not progress) as pbar: for f in as_completed(futures): meshes.append(f.result()) pbar.update(1) return meshes bodyid = utils.parse_bid(bodyid) voxels = get_sparsevol(bodyid, scale=scale, ret_type='INDEX', save_to=None, bbox=bbox, server=server, node=node) defaults = dict(chunk_size=200 if scale in (0, 1, 2, 3, 4) else None, merge_fragments=True, pad_chunks=True) defaults.update(kwargs) # Get voxel sizes based on scale info = get_segmentation_info(server, node)['Extended'] vsize = {'COARSE': [s * 8 for s in info['BlockSize']]} vsize.update({i: np.array(info['VoxelSize']) * 2**i for i in range(info['MaxDownresLevel'])}) mesh = meshing.mesh_from_voxels(voxels, spacing=vsize[scale], step_size=step_size, **defaults) # Track the ID just in case mesh.id = bodyid return mesh
[docs]@lru_cache(None) def get_segmentation_info(server=None, node=None): """Return segmentation info as dictionary (cached). Parameters ---------- server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. """ server, node, user = eval_param(server, node) r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}/info'.format(node, config.segmentation))) return r.json()
[docs]def get_n_synapses(bodyid, server=None, node=None): """Return number of pre- and postsynapses associated with given body. Parameters ---------- bodyid : int | str ID of body for which to get number of synapses. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- dict ``{'PreSyn': int, 'PostSyn': int}`` """ server, node, user = eval_param(server, node) bodyid = utils.parse_bid(bodyid) if isinstance(bodyid, (list, np.ndarray)): syn = {b: get_n_synapses(b, server, node) for b in bodyid} return pd.DataFrame.from_records(syn).T r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}_labelsz/count/{}/PreSyn'.format(node, config.synapses, bodyid))) r.raise_for_status() pre = r.json() r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}_labelsz/count/{}/PostSyn'.format(node, config.synapses, bodyid))) r.raise_for_status() post = r.json() return {'pre': pre.get('PreSyn', None), 'post': post.get('PostSyn', None)}
[docs]def get_synapses(bodyid, pos_filter=None, with_details=False, server=None, node=None): """Return table of pre- and postsynapses associated with given body. Parameters ---------- bodyid : int | str ID of body for which to get synapses. pos_filter : function, optional Function to filter synapses by position. Must accept numpy array (N, 3) and return array of [True, False, ...] with_details : bool, optional If True, will include more detailed information about connector links. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- pandas.DataFrame Examples -------- Get synapses only in the LH (requires navis) >>> import navis >>> lh = navis.Volume(*dvidtools.get_roi('LH')) >>> lh_syn = dvidtools.get_synapses(329566174, ... pos_filter=lambda x: navis.in_volume(x, lh)) """ if isinstance(bodyid, (list, np.ndarray)): tables = [get_synapses(b, pos_filter, server, node) for b in tqdm(bodyid, desc='Fetching')] for b, tbl in zip(bodyid, tables): tbl['bodyid'] = b return pd.concat(tables, axis=0) server, node, user = eval_param(server, node) bodyid = utils.parse_bid(bodyid) r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}/label/{}?relationships={}'.format(node, config.synapses, bodyid, str(with_details).lower()))) syn = r.json() if pos_filter: # Get filter filtered = pos_filter(np.array([s['Pos'] for s in syn])) if not any(filtered): raise ValueError('No synapses left after filtering.') syn = np.array(syn)[filtered] return pd.DataFrame.from_records(syn)
[docs]def get_connections(source, target, pos_filter=None, server=None, node=None): """Return list of connections between source(s) and target(s). Parameters ---------- source : int | str Body ID(s) of sources. target : int | str Body ID(s) of targets. pos_filter : function, optional Function to filter synapses by position. Must accept numpy array (N, 3) and return array of [True, False, ...] server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- pandas.DataFrame DataFrame containing "bodyid_pre", "tbar_position", "tbar_confidence", "psd_position", "bodyid_post". """ if not isinstance(source, (list, np.ndarray)): source = [source] if not isinstance(target, (list, np.ndarray)): target = [target] server, node, user = eval_param(server, node) if len(source) <= len(target): to_query = source query_rel = 'PreSyn' else: to_query = target query_rel = 'PostSyn' cn_data = [] for q in to_query: r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}/label/{}?relationships=true'.format(node, config.synapses, q))) # Raise r.raise_for_status() # Extract synapses syn = r.json() if not syn: continue # Find arbitrary properties props = list(set([k for s in syn for k in s['Prop'].keys()])) # Collect downstream connections this_cn = [[s['Pos'], r['To']] + [s['Prop'].get(p, None) for p in props] for s in syn if s['Kind'] == query_rel and s['Rels'] for r in s['Rels']] df = pd.DataFrame(this_cn) # Add columns if query_rel == 'PreSyn': df.columns = ['tbar_position', 'psd_position'] + props # If we queried sources, we now the identity of presynaptic neuron df['bodyid_pre'] = q else: df.columns = ['psd_position', 'tbar_position'] + props cn_data.append(df) cn_data = pd.concat(cn_data, axis=0, sort=True) if pos_filter: # Get filter filtered = pos_filter(np.vstack(cn_data.tbar_position.values)) if not any(filtered): raise ValueError('No synapses left after filtering.') # Filter synapses cn_data = cn_data.loc[filtered, :] # Add body positions if 'bodyid_pre' not in cn_data.columns: # Get positions of PSDs pos = np.vstack(cn_data.tbar_position.values) # Get postsynaptic body IDs bodies = locs_to_ids(pos, server=server, node=node) cn_data['bodyid_pre'] = bodies # Filter to sources of interest cn_data = cn_data[cn_data.bodyid_pre.isin(source)] if 'bodyid_post' not in cn_data.columns: # Get positions of PSDs pos = np.vstack(cn_data.psd_position.values) # Get presynaptic body IDs bodies = locs_to_ids(pos, server=server, node=node) cn_data['bodyid_post'] = bodies # Filter to targets of interest cn_data = cn_data[cn_data.bodyid_post.isin(target)] return cn_data
[docs]def get_connectivity(bodyid, pos_filter=None, ignore_autapses=True, server=None, node=None): """Return connectivity table for given body. Parameters ---------- bodyid : int | str ID of body for which to get connectivity. pos_filter : function, optional Function to filter synapses by position. Must accept numpy array (N, 3) and return array of [True, False, ...] ignore_autapses : bool, optional If True, will ignore autapses. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- pandas.DataFrame """ if isinstance(bodyid, (list, np.ndarray)): bodyid = np.array(bodyid).astype(str) cn = [get_connectivity(b, pos_filter=pos_filter, ignore_autapses=ignore_autapses, server=server, node=node) for b in tqdm(bodyid)] # Concatenate the DataFrames conc = [] for r in ['upstream', 'downstream']: this_r = [d[d.relation == r].set_index('bodyid').drop('relation', axis=1) for d in cn] this_r = pd.concat(this_r, axis=1) this_r.columns = bodyid this_r['relation'] = r this_r = this_r[np.append('relation', bodyid)] conc.append(this_r.reset_index(drop=False)) cn = pd.concat(conc, axis=0).reset_index(drop=True) cn = cn.fillna(0) cn['total'] = cn[bodyid].sum(axis=1) return cn.sort_values(['relation', 'total'], ascending=False).reset_index(drop=True) server, node, user = eval_param(server, node) bodyid = utils.parse_bid(bodyid) # Get synapses r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}/label/{}?relationships=true'.format(node, config.synapses, bodyid))) # Raise r.raise_for_status() syn = r.json() if pos_filter: # Get filter filtered = pos_filter(np.array([s['Pos'] for s in syn])) if not any(filtered): pass #raise ValueError('No synapses left after filtering.') # Filter synapses syn = np.array(syn)[filtered] # Collect positions and query the body IDs of pre-/postsynaptic neurons pos = [cn['To'] for s in syn for cn in s['Rels']] bodies = locs_to_ids(pos, server=server, node=node) # Compile connector table by counting # of synapses between neurons connections = {'PreSynTo': {}, 'PostSynTo': {}} i = 0 for s in syn: # Connections point to positions -> we have to map this to body IDs for k, cn in enumerate(s['Rels']): b = bodies[i+k] connections[cn['Rel']][b] = connections[cn['Rel']].get(b, 0) + 1 i += k + 1 if connections['PreSynTo']: # Generate connection table pre = pd.DataFrame.from_dict(connections['PreSynTo'], orient='index') pre.columns = ['n_synapses'] pre['relation'] = 'downstream' else: pre = pd.DataFrame([], columns=['n_synapses', 'relation']) pre.index = pre.index.astype(np.int64) pre.index.name = 'bodyid' if connections['PostSynTo']: post = pd.DataFrame.from_dict(connections['PostSynTo'], orient='index') post.columns = ['n_synapses'] post['relation'] = 'upstream' else: post = pd.DataFrame([], columns=['n_synapses', 'relation']) post.index = post.index.astype(np.int64) post.index.name = 'bodyid' # Combine up- and downstream cn_table = pd.concat([pre.reset_index(), post.reset_index()], axis=0) cn_table.sort_values(['relation', 'n_synapses'], inplace=True, ascending=False) cn_table.reset_index(drop=True, inplace=True) if ignore_autapses: to_drop = cn_table.index[cn_table.bodyid == int(bodyid)] cn_table = cn_table.drop(index=to_drop).reset_index() return cn_table[['bodyid', 'relation', 'n_synapses']]
[docs]def get_adjacency(sources, targets=None, pos_filter=None, ignore_autapses=True, server=None, node=None): """Get adjacency between sources and targets. Parameters ---------- sources : iterable Body IDs of sources. targets : iterable, optional Body IDs of targets. If not provided, targets = sources. pos_filter : function, optional Function to filter synapses by position. Must accept numpy array (N, 3) and return array of [True, False, ...] server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- adjacency matrix : pandas.DataFrame Sources = rows; targets = columns """ server, node, user = eval_param(server, node) if not isinstance(sources, (list, tuple, np.ndarray)): sources = [sources] if isinstance(targets, type(None)): targets = sources elif not isinstance(targets, (list, tuple, np.ndarray)): targets = [targets] # Make sure we don't have any duplicates sources = np.array(list(set(sources))).astype(str) targets = np.array(list(set(targets))).astype(str) # Make sure we query the smaller population from the server if len(targets) <= len(sources): columns, index, relation, to_transpose = targets, sources, 'upstream', False else: columns, index, relation, to_transpose = sources, targets, 'downstream', True # Get connectivity cn = get_connectivity(columns, pos_filter=pos_filter, ignore_autapses=ignore_autapses, server=server, node=node) # Subset connectivity to source -> target cn = cn[cn.relation == relation].set_index('bodyid') cn.index = cn.index.astype(str) cn = cn.reindex(index=index, columns=columns, fill_value=0) if to_transpose: cn = cn.T return cn
[docs]def snap_to_body(bodyid, positions, server=None, node=None): """Snap a set of positions to the closest voxels on a given body. Parameters ---------- bodyid : body ID Body for which to find positions. positions : array-like List/Array of (x, y, z) raw(!) coordinates. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- (x, y, z) """ # Parse body ID bodyid = utils.parse_bid(bodyid) if isinstance(positions, pd.DataFrame): positions = positions[['x', 'y', 'z']].values elif not isinstance(positions, np.ndarray): positions = np.array(positions) positions = positions.astype(int) # Find those that are not already within the body bids = locs_to_ids(positions, server=server, node=node) mask = np.array(bids) != int(bodyid) to_snap = positions[mask] # First get voxels of the coarse neuron voxels = get_sparsevol(bodyid, scale='coarse', ret_type='INDEX', server=server, node=node) # Get voxel sizes based on scale info = get_segmentation_info(server, node)['Extended'] voxels = voxels * info['BlockSize'] # For each position find a corresponding coarse voxel dist = cdist(to_snap, voxels) closest = voxels[np.argmin(dist, axis=1)] # Now query the more precise mesh for these coarse voxels snapped = [] for v in tqdm(closest, leave=False, desc='Snapping'): # Generate a bounding bbox to only fetch the voxels we actually need bbox = np.vstack([v, v]).T bbox[:, 1] += info['BlockSize'] fine = get_sparsevol(bodyid, scale=0, ret_type='INDEX', bbox=bbox.ravel(), server=server, node=node) dist = cdist([v], fine) snapped.append(fine[np.argmin(dist, axis=1)][0]) positions[mask] = np.vstack(snapped) return positions
[docs]def get_last_mod(bodyid, server=None, node=None): """Fetch details on the last modification to given body. Parameters ---------- bodyid : body ID Body for which to find positions. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- dict {'mutation id': int, 'last mod user': str, 'last mod app': str, 'last mod time': timestamp isoformat str} """ # Parse body ID bodyid = utils.parse_bid(bodyid) server, node, user = eval_param(server, node) r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}/lastmod/{}'.format(node, config.segmentation, bodyid))) r.raise_for_status() return r.json()
[docs]def get_skeleton_mutation(bodyid, server=None, node=None): """Fetch mutation ID of given body. Works by downloading the SWC file and parsing only the header. Parameters ---------- bodyid : int | str ID(s) of body for which to download skeleton. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Returns ------- int Mutation ID. Returns ``None`` if no skeleton available. """ if isinstance(bodyid, (list, np.ndarray)): resp = {x: get_skeleton_mutation(x, server=server, node=node) for x in tqdm(bodyid, desc='Loading')} return resp def split_iter(string): # Returns iterator for line split -> memory efficient # Attention: this will not fetch the last line! return (x.group(0) for x in re.finditer('.*?\n', string)) bodyid = utils.parse_bid(bodyid) server, node, user = eval_param(server, node) r = dvid_session().get(urllib.parse.urljoin(server, 'api/node/{}/{}_skeletons/key/{}_swc'.format(node, config.segmentation, bodyid))) if 'not found' in r.text: print(r.text) return None # Extract header using a generator -> this way we don't have to iterate # over all lines lines = split_iter(r.text) header = [] for l in lines: if l.startswith('#'): header.append(l) else: break # Turn header back into string header = '\n'.join(header) if 'mutation id' not in header: print('{} - Unable to check mutation: mutation ID not in SWC header'.format(bodyid)) else: swc_mut = re.search('"mutation id": (.*?)}', header).group(1) return int(swc_mut)
[docs]def list_projects(server=None): """List available projects on the server. Parameters ---------- server : str If not provided will fall back to globally defined server. Returns ------- pandas.DataFrame """ server, _, _ = eval_param(server) url = utils.make_url(server, 'api/repos/info') r = dvid_session().get(url) r.raise_for_status() return pd.DataFrame.from_records(list(r.json().values()))
[docs]def get_master_node(id, server=None): """Get UUID of the current master node. Parameters ---------- id : str UUID of a node or a project you want the current master node for. You can get use `list_projects` to get the root ID for a project and then use this function to get the master. server : str, optional If not provided will fall back to globally defined server. Returns ------- master : str ID of master node. """ server, _, _ = eval_param(server) url = utils.make_url(server, f'api/repo/{id}/branch-versions/master') r = dvid_session().get(url) try: r.raise_for_status() except HTTPError: raise HTTPError(f'{r.status_code} {r.reason}: {r.text}') except BaseException: raise return r.json()[0]