Source code for dvid.tip

# This code is part of dvid-tools (https://github.com/flyconnectome/dvid_tools)
# and is released under GNU GPL3

from . import utils
from . import fetch

import datetime as dt
import networkx as nx
import numpy as np
import os
import pandas as pd
import warnings

from collections import OrderedDict
from scipy.spatial.distance import cdist, pdist, squareform
from tqdm import tqdm


__all__ = ['detect_tips']


[docs]def detect_tips(x, use_clf=False, psd_dist=False, done_dist=False, checked_dist=50, tip_dist=False, pos_filter=None, save_to=None, verbose=True, snap=True, server=None, node=None): """Detect potential open ends on a given neuron. In brief, the workflow is as follows: 1. Extract tips from the neuron's skeleton 2. Snap tip positions back to mesh (see ``snap``) 3. Apply filters (see ``_dist`` parameters) 4. Remove tips close to a previously checked assignment (see ``checked_dist``) 5. (Optionally) Try to prioritize tips (``use_clf``) 6. Save to a json file (see ``save_to``) that can be opened in neuTu Parameters ---------- x : single body ID use_clf : bool, optional If True, will use a pre-trained classifier to try to predict whether a tip needs human interaction or not. The returned list of tips will contain and be ordered by "confidence" values from -1 (unimportant) to +1 (important). THIS IS NOT A MAGIC BULLET! If you need to be certain you have completed a neuron, you will actually have to look at all tips regardless of confidence! psd_dist : int | None, optional Minimum distance (in raw units) to a postsynaptic density (PSD) for a tip to be considered "done". done_dist : int | None, Minimum distance (in raw units) to a DONE tag for a tip to be considered "done". checked_dist : int | None, optional Minimum distance (in raw units) to a bookmark that has previously been "Set Checked" in the "Assigned bookmarks" table in Neutu. tip_dist : int | None, optional If a given pair of tips is closer than this distance they will be considered duplicates and one of them will be dropped. pos_filter : function, optional Function to fitler tips by position. Must accept numpy array (N, 3) and return array of [True, False, ...]. save_to : filepath, optional If provided will save open ends to JSON file that can be imported as assigmnents. snap : bool, optional If True, will make sure that tips positions are within the mesh. server : str, optional If not provided, will try reading from global. node : str, optional If not provided, will try reading from global. Return ------ pandas.DataFrame List of potential open ends. Examples -------- The usual setting up: >>> import dvidtools as dt >>> dt.set_param('https://your.server.com:8000', 'node', 'user') By default, ``detect_tips`` will simply return all end nodes of the skeleton that have not previously been "Set checked" in NeuTu: >>> # Generate list of tips and save to json file >>> tips = dt.detect_tips(883338122, save_to='~/Documents/883338122.json') A more advanced example is using a pre-trained classifier to include "confidence" values. These confidence range from -1 to +1 and give some indication whether a tip needs to be extended or not. This requires `sciki-learn <https://scikit-learn.org>`_ to be installed. In a terminal run:: pip install scikit-learn Once scikit-learn is installed, you can run the tip detector with classifier confidences: >>> tips = dt.detect_tips(883338122, use_clf=True, ... save_to='~/Documents/883338122.json') """ server, node, user = fetch.eval_param(server, node) # Get the skeleton n = fetch.get_skeletons(x, save_to=None, server=server, node=node)[0] if isinstance(n, type(None)): raise ValueError('{} appears to not have a skeleton. Please double' ' check the body ID.') # Find leaf and root nodes leafs = n[(~n.node_id.isin(n.parent_id.values)) | (n.parent_id <= 0)].copy() # Remove potential duplicated leafs if tip_dist: # Get all by all distance dist = squareform(pdist(leafs[['x', 'y', 'z']].values)) # Set upper triangle (including self dist) to infinite so that we only # get (A->B) and not (B->A) distances dist[np.triu_indices(dist.shape[0])] = float('inf') # Extract those that are too close too_close = list(set(np.where(dist < tip_dist)[0])) # Drop 'em leafs = leafs.reset_index().drop(too_close, axis=0).reset_index() # Skeletons can end up outside the body's voxels - let's snap 'em back if snap: leafs.loc[:, ['x', 'y', 'z']] = fetch.snap_to_body(x, leafs[['x', 'y', 'z']].values, server=server, node=node) if pos_filter: # Get filter filtered = pos_filter(leafs[['x', 'y', 'z']].values) if not any(filtered): raise ValueError('No tips left after filtering!') leafs = leafs.loc[filtered, :] n_leafs = leafs.shape[0] if psd_dist: # Get synapses syn = fetch.get_synapses(x, pos_filter=None, with_details=False, server=server, node=node) post = syn[syn.Kind == 'PostSyn'] # Get distances dist = cdist(leafs[['x', 'y', 'z']].values, np.vstack(post.Pos.values)) # Is tip close to PSD? at_psd = np.min(dist, axis=1) < psd_dist leafs = leafs.loc[~at_psd] psd_filtered = n_leafs - leafs.shape[0] if done_dist: # Check for DONE tags in vicinity at_done = [] for pos in tqdm(leafs[['x', 'y', 'z']].values, desc='Check DONE', leave=False): # We are cheating here b/c we don't actually calculate the # distance! labels = fetch.get_labels_in_area(pos - done_dist/2, [done_dist] * 3, server=server, node=node) if isinstance(labels, type(None)): at_done.append(False) continue # DONE tags have no "action" and "checked" = 1 if any([p.get('checked', False) and not p.get('action', False) for p in labels.Prop.values]): at_done.append(True) else: at_done.append(False) leafs = leafs.loc[~np.array(at_done, dtype=bool)] done_filtered = n_leafs - leafs.shape[0] if checked_dist: # Check if position has been "Set Checked" in the past checked = [] for pos in tqdm(leafs[['x', 'y', 'z']].values, desc='Test Checked', leave=False): # We will look for the assigment in a small window in case the # tip has moved slightly between iterations ass = fetch.get_assignment_status(pos, window=[checked_dist] * 3, bodyid=x if snap else None, server=server, node=node) if any([l.get('checked', False) for l in ass]): checked.append(True) else: checked.append(False) leafs = leafs.loc[~np.array(checked, dtype=bool)] checked_filtered = n_leafs - leafs.shape[0] # Make a copy before we wrap up to prevent any data-on-copy warning leafs = leafs.copy() # Assuming larger radii indicate more likely continuations leafs.sort_values('radius', ascending=False, inplace=True) if use_clf: try: import sklearn from sklearn.externals import joblib except ImportError: print('Must have scikit-learn (https://scikit-learn.org) library ' 'installed. Skipping classification.') sklearn = None except BaseException: raise if use_clf and sklearn: fp = os.path.dirname(__file__) model_file = os.path.join(fp, 'model/model.pkl') scaler_file = os.path.join(fp, 'model/scaler.pkl') try: clf = joblib.load(model_file) scaler = joblib.load(scaler_file) except BaseException: print('Unable to load classifier model. Skipping classification.') clf = None if use_clf and clf: # Get synapse data syn = fetch.get_synapses(x, pos_filter=None, with_details=False) features = _generate_features(n, leafs, syn) # Fit Data with warnings.catch_warnings(): warnings.simplefilter("ignore") features = scaler.fit_transform(features.values) # Predict probability = clf.predict_proba(features) confidence = np.round((probability[:, 1] - probability[:, 0]) / (probability[:, 1] + probability[:, 0]), 3) # Add confidence leafs['confidence'] = confidence leafs['text'] = leafs.confidence.astype(str) leafs['comment'] = leafs['text'] # Sort by confidence leafs.sort_values('confidence', inplace=True, ascending=False), else: leafs['text'] = '' if verbose: d = OrderedDict({ 'Total tips': n_leafs, 'PSD filtered': psd_filtered, 'Done tag filtered': done_filtered, 'Checked assignment filtered': checked_filtered, 'Tips left': leafs.shape[0], }) print(pd.DataFrame.from_dict(d, orient='index', columns=[x])) if save_to: leafs['body ID'] = x meta = {'description': 'Generated by dvidtools.detect_tips', 'date': dt.date.today().isoformat(), 'url': 'https://github.com/flyconnectome/dvid_tools', 'parameters': {'psd_dist': psd_dist, 'done_dist': done_dist, 'checked_dist': checked_dist, 'snap': snap, 'tip_dist': tip_dist, 'node': node, 'use_clf': use_clf, 'pos_filter': not isinstance(pos_filter, (type(None), bool)), 'swc_mutation_id': getattr(n, 'mutation_id', 'NA'), 'server': server}} _ = utils.gen_assignments(leafs, save_to=save_to, meta=meta) return leafs.reset_index(drop=True)
def _generate_features(swc, tips, syn): """Generate features.""" # Generate graph g = utils.swc_to_graph(swc) # Collect features f = pd.DataFrame([]) f['surround_radius'] = tips.node_id.map(lambda x: _surround_radius(x, g)) f['parent_radius'] = tips.node_id.map(lambda x: g.nodes[_first_branch_point(x, g)]['radius']) f['parent_vector'] = tips.node_id.map(lambda x: _dot_product(x, g)) f['hk_dist'] = [_dist_to_hk(x) for x in tips[['x', 'y', 'z']].values] f['branch_length'] = tips.node_id.map(lambda x: _branch_length(x, g)) f['tortuosity'] = tips.node_id.map(lambda x: _tortuosity(x, g)) f['psd_dist'] = _dist_to_psd(tips, syn) f['tbar_dist'] = _dist_to_tbar(tips, syn) f['tip_dist'] = _dist_to_tip(tips) f['io_ratio'] = _in_out_ratio(tips, syn) # Fill NaNs f.fillna(0, inplace=True) return f def _first_branch_point(x, g): """Return first branch point upstream of x.""" while True: if g.in_degree(x) > 1 or g.out_degree(x) == 0: return x x = next(g.successors(x)) def _branch_length(x, g): """Return length of path from node x to the first branch point.""" return nx.shortest_path_length(g, x, _first_branch_point(x, g), weight='weight') def _tortuosity(x, g): """Return tortuosity for path from node x to the first branch point.""" L = _branch_length(x, g) R = np.sqrt(np.sum((g.nodes[x]['location'] - g.nodes[_first_branch_point(x, g)]['location'])**2)) # If no subsequent prior branch point return 0 if R: return L / R else: return 0 def _dist_to_hk(x): """Return distance to closest hot knife section. """ # x positions of hot knife sections hk = np.array([5249, 7920, 10599, 13229, 15894, 18489, 21204, 24040, 26854, 29743, 32359]) hk = hk.reshape((len(hk), 1)) d = cdist(np.reshape(x[0], (1, 1)), hk) return np.min(d) def _dot_product(x, g, normal=True): """Return dot product of (normalised) vector of x and the first downstream branch point. """ if g.out_degree(x) == 0: return None v1 = np.array(g.nodes[x]['location']) - np.array(g.nodes[next(g.successors(x))]['location']) bp = _first_branch_point(x, g) if g.out_degree(bp) == 0: bpp = next(g.predecessors(bp)) else: bpp = next(g.successors(bp)) v2 = np.array(g.nodes[bp]['location']) - np.array(g.nodes[bpp]['location']) if normal: v1 /= np.linalg.norm(v1) v2 /= np.linalg.norm(v2) return np.dot(v1, v2) def _surround_radius(x, g, n=5): """Return mean radius in the surrounding of a node.""" r = [] while g.out_degree(x) and len(r) < n: r.append(g.nodes[x]['radius']) x = next(g.successors(x)) if r: return np.mean(r) else: return g.nodes[x]['radius'] def _dist_to_psd(swc, syn): """Return distance to next PSD for all rows in a SWC table.""" post = syn[syn.Kind == 'PostSyn'] dist = cdist(swc[['x', 'y', 'z']].values, np.vstack(post.Pos.values)) return np.min(dist, axis=1) def _dist_to_tbar(swc, syn): """Return distance to next presynapse for all rows in a SWC table.""" tbar = syn[syn.Kind == 'PreSyn'] dist = cdist(swc[['x', 'y', 'z']].values, np.vstack(tbar.Pos.values)) return np.min(dist, axis=1) def _dist_to_tip(tips): """Return distance to next closest tip for all rows in tips table.""" dist = squareform(pdist(tips[['x', 'y', 'z']].values)) dist[dist == 0] = float('inf') return np.min(dist, axis=1) def _in_out_ratio(swc, syn, r=1000): """Return ratio of PSDs vs presynapses within given radius.""" post = syn[syn.Kind == 'PostSyn'] pre = syn[syn.Kind == 'PreSyn'] post_dist = cdist(swc[['x', 'y', 'z']].values, np.vstack(post.Pos.values)) pre_dist = cdist(swc[['x', 'y', 'z']].values, np.vstack(pre.Pos.values)) n_post = np.sum(post_dist <= r, axis=1) n_pre = np.sum(pre_dist <= r, axis=1) # Make sure there are no zeros total = n_pre + n_post total[total == 0] = 1 # Return ratio return (n_post-n_pre) / total