Skip to content

nblast_funcs

Implements version 2 of the NBLAST algorithm.

Please note that some properties are computed on initialization and changing parameters (e.g. use_alpha) at a later stage will mess things up!

PARAMETER DESCRIPTION
use_alpha
        Whether or not to use alpha values for the scoring.
        If True, the dotproduct of nearest neighbor vectors will
        be scaled by `sqrt(alpha1 * alpha2)`.

TYPE: bool DEFAULT: False

normalized
        If True, will normalize scores by the best possible score
        (i.e. self-self) of the query neuron.

TYPE: bool DEFAULT: True

smat
        How to convert the point match pairs into an NBLAST score,
        usually by a lookup table:
         - if 'auto' (default), will use the "official" NBLAST scoring
           matrices based on FCWB data. Same behaviour as in R's
           nat.nblast implementation.
         - if `smat='v1'` it uses the analytic formulation of the
           NBLAST scoring from Kohl et. al (2013). You can adjust parameter
           `sigma_scaling` (default to 10) using `smat_kwargs`.
         - DataFrames will be used to build a `Lookup2d`
         - if `Callable` given, it passes distance and dot products as
           first and second argument respectively
         - if `smat=None` the scores will be generated as the
           product of the distances and the dotproduct of the vectors
           of nearest-neighbor pairs

TYPE: navis.nbl.smat.Lookup2d | pd.DataFrame | str | Callable DEFAULT: 'auto'

smat_kwargs
        functions. For example: `smat_kwargs["sigma_scoring"] = 10`.

DEFAULT: dict()

limit_dist
        Sets the max distance for the nearest neighbor search
        (`distance_upper_bound`). Typically this should be the
        highest distance considered by the scoring function. If
        "auto", will extract that value from the first axis of the
        scoring matrix.

TYPE: float | "auto" | None DEFAULT: None

progress
        If True, will show a progress bar.

TYPE: bool DEFAULT: True

Source code in navis/nbl/nblast_funcs.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class NBlaster(Blaster):
    """Implements version 2 of the NBLAST algorithm.

    Please note that some properties are computed on initialization and
    changing parameters (e.g. `use_alpha`) at a later stage will mess things
    up!

    Parameters
    ----------
    use_alpha :     bool
                    Whether or not to use alpha values for the scoring.
                    If True, the dotproduct of nearest neighbor vectors will
                    be scaled by `sqrt(alpha1 * alpha2)`.
    normalized :    bool
                    If True, will normalize scores by the best possible score
                    (i.e. self-self) of the query neuron.
    smat :          navis.nbl.smat.Lookup2d | pd.DataFrame | str | Callable
                    How to convert the point match pairs into an NBLAST score,
                    usually by a lookup table:
                     - if 'auto' (default), will use the "official" NBLAST scoring
                       matrices based on FCWB data. Same behaviour as in R's
                       nat.nblast implementation.
                     - if `smat='v1'` it uses the analytic formulation of the
                       NBLAST scoring from Kohl et. al (2013). You can adjust parameter
                       `sigma_scaling` (default to 10) using `smat_kwargs`.
                     - DataFrames will be used to build a `Lookup2d`
                     - if `Callable` given, it passes distance and dot products as
                       first and second argument respectively
                     - if `smat=None` the scores will be generated as the
                       product of the distances and the dotproduct of the vectors
                       of nearest-neighbor pairs
    smat_kwargs:    Dictionary with additional parameters passed to scoring
                    functions. For example: `smat_kwargs["sigma_scoring"] = 10`.
    limit_dist :    float | "auto" | None
                    Sets the max distance for the nearest neighbor search
                    (`distance_upper_bound`). Typically this should be the
                    highest distance considered by the scoring function. If
                    "auto", will extract that value from the first axis of the
                    scoring matrix.
    progress :      bool
                    If True, will show a progress bar.

    """

    def __init__(self, use_alpha=False, normalized=True, smat='auto',
                 limit_dist=None, approx_nn=False, dtype=np.float64,
                 progress=True, smat_kwargs=dict()):
        """Initialize class."""
        super().__init__(progress=progress, dtype=dtype)
        self.use_alpha = use_alpha
        self.normalized = normalized
        self.approx_nn = approx_nn
        self.desc = "NBlasting"

        if smat is None:
            self.score_fn = operator.mul
        elif smat == 'auto':
            self.score_fn = smat_fcwb(self.use_alpha)
        elif smat == 'v1':
            self.score_fn = partial(
                _nblast_v1_scoring, sigma_scoring = smat_kwargs.get('sigma_scoring', 10)
            )
        elif isinstance(smat, pd.DataFrame):
            self.score_fn = Lookup2d.from_dataframe(smat)
        else:
            self.score_fn = smat

        if limit_dist == "auto":
            try:
                if self.score_fn.axes[0].boundaries[-1] != np.inf:
                    self.distance_upper_bound = self.score_fn.axes[0].boundaries[-1]
                else:
                    # If the right boundary is open (i.e. infinity), we will use
                    # the second highest boundary plus a 5% offset
                    self.distance_upper_bound = self.score_fn.axes[0].boundaries[-2] * 1.05
            except AttributeError:
                logger.warning("Could not infer distance upper bound from scoring function")
                self.distance_upper_bound = None
        else:
            self.distance_upper_bound = limit_dist

    def append(self, dotprops: Dotprops, self_hit: Optional[float] = None) -> NestedIndices:
        """Append dotprops.

        Returns the numerical index appended dotprops.
        If dotprops is a (possibly nested) sequence of dotprops,
        return a (possibly nested) list of indices.

        Note that `self_hit` is ignored (and hence calculated from scratch)
        when `dotprops` is a nested list of dotprops.
        """
        if isinstance(dotprops, Dotprops):
            return self._append_dotprops(dotprops, self_hit=self_hit)

        try:
            return [self.append(n) for n in dotprops]
        except TypeError:  # i.e. not iterable
            raise ValueError(f"Expected Dotprops or iterable thereof; got {type(dotprops)}")

    def _append_dotprops(self, dotprops: Dotprops, self_hit: Optional[float] = None) -> int:
        next_id = len(self)
        self.neurons.append(dotprops)
        self.ids.append(dotprops.id)
        # Calculate score for self hit
        if not self_hit:
            self.self_hits.append(self.calc_self_hit(dotprops))
        else:
            self.self_hits.append(self_hit)
        return next_id

    def calc_self_hit(self, dotprops):
        """Non-normalized value for self hit."""
        if not self.use_alpha:
            return len(dotprops.points) * self.score_fn(0, 1.0)
        else:
            dists = np.repeat(0, len(dotprops.points))
            alpha = dotprops.alpha * dotprops.alpha
            dots = np.repeat(1, len(dotprops.points)) * np.sqrt(alpha)
            return self.score_fn(dists, dots).sum()

    def single_query_target(self, q_idx: int, t_idx: int, scores='forward'):
        """Query single target against single target."""
        # Take a short-cut if this is a self-self comparison
        if q_idx == t_idx:
            if self.normalized:
                return 1
            return self.self_hits[q_idx]

        # Run nearest-neighbor search for query against target
        data = self.neurons[q_idx].dist_dots(self.neurons[t_idx],
                                             alpha=self.use_alpha,
                                             # eps=0.1 means we accept 10% inaccuracy
                                             eps=.1 if self.approx_nn else 0,
                                             distance_upper_bound=self.distance_upper_bound)
        if self.use_alpha:
            dists, dots, alpha = data
            dots *= np.sqrt(alpha)
        else:
            dists, dots = data

        scr = self.score_fn(dists, dots).sum()

        # Normalize against best hit
        if self.normalized:
            scr /= self.self_hits[q_idx]

        # For the mean score we also have to produce the reverse score
        if scores in ('mean', 'min', 'max', 'both'):
            reverse = self.single_query_target(t_idx, q_idx, scores='forward')
            if scores == 'mean':
                scr = (scr + reverse) / 2
            elif scores == 'min':
                scr = min(scr, reverse)
            elif scores == 'max':
                scr = max(scr, reverse)
            elif scores == 'both':
                # If both scores are requested
                scr = [scr, reverse]

        return scr

Initialize class.

Source code in navis/nbl/nblast_funcs.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def __init__(self, use_alpha=False, normalized=True, smat='auto',
             limit_dist=None, approx_nn=False, dtype=np.float64,
             progress=True, smat_kwargs=dict()):
    """Initialize class."""
    super().__init__(progress=progress, dtype=dtype)
    self.use_alpha = use_alpha
    self.normalized = normalized
    self.approx_nn = approx_nn
    self.desc = "NBlasting"

    if smat is None:
        self.score_fn = operator.mul
    elif smat == 'auto':
        self.score_fn = smat_fcwb(self.use_alpha)
    elif smat == 'v1':
        self.score_fn = partial(
            _nblast_v1_scoring, sigma_scoring = smat_kwargs.get('sigma_scoring', 10)
        )
    elif isinstance(smat, pd.DataFrame):
        self.score_fn = Lookup2d.from_dataframe(smat)
    else:
        self.score_fn = smat

    if limit_dist == "auto":
        try:
            if self.score_fn.axes[0].boundaries[-1] != np.inf:
                self.distance_upper_bound = self.score_fn.axes[0].boundaries[-1]
            else:
                # If the right boundary is open (i.e. infinity), we will use
                # the second highest boundary plus a 5% offset
                self.distance_upper_bound = self.score_fn.axes[0].boundaries[-2] * 1.05
        except AttributeError:
            logger.warning("Could not infer distance upper bound from scoring function")
            self.distance_upper_bound = None
    else:
        self.distance_upper_bound = limit_dist

Append dotprops.

Returns the numerical index appended dotprops. If dotprops is a (possibly nested) sequence of dotprops, return a (possibly nested) list of indices.

Note that self_hit is ignored (and hence calculated from scratch) when dotprops is a nested list of dotprops.

Source code in navis/nbl/nblast_funcs.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def append(self, dotprops: Dotprops, self_hit: Optional[float] = None) -> NestedIndices:
    """Append dotprops.

    Returns the numerical index appended dotprops.
    If dotprops is a (possibly nested) sequence of dotprops,
    return a (possibly nested) list of indices.

    Note that `self_hit` is ignored (and hence calculated from scratch)
    when `dotprops` is a nested list of dotprops.
    """
    if isinstance(dotprops, Dotprops):
        return self._append_dotprops(dotprops, self_hit=self_hit)

    try:
        return [self.append(n) for n in dotprops]
    except TypeError:  # i.e. not iterable
        raise ValueError(f"Expected Dotprops or iterable thereof; got {type(dotprops)}")

Non-normalized value for self hit.

Source code in navis/nbl/nblast_funcs.py
168
169
170
171
172
173
174
175
176
def calc_self_hit(self, dotprops):
    """Non-normalized value for self hit."""
    if not self.use_alpha:
        return len(dotprops.points) * self.score_fn(0, 1.0)
    else:
        dists = np.repeat(0, len(dotprops.points))
        alpha = dotprops.alpha * dotprops.alpha
        dots = np.repeat(1, len(dotprops.points)) * np.sqrt(alpha)
        return self.score_fn(dists, dots).sum()

Query single target against single target.

Source code in navis/nbl/nblast_funcs.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def single_query_target(self, q_idx: int, t_idx: int, scores='forward'):
    """Query single target against single target."""
    # Take a short-cut if this is a self-self comparison
    if q_idx == t_idx:
        if self.normalized:
            return 1
        return self.self_hits[q_idx]

    # Run nearest-neighbor search for query against target
    data = self.neurons[q_idx].dist_dots(self.neurons[t_idx],
                                         alpha=self.use_alpha,
                                         # eps=0.1 means we accept 10% inaccuracy
                                         eps=.1 if self.approx_nn else 0,
                                         distance_upper_bound=self.distance_upper_bound)
    if self.use_alpha:
        dists, dots, alpha = data
        dots *= np.sqrt(alpha)
    else:
        dists, dots = data

    scr = self.score_fn(dists, dots).sum()

    # Normalize against best hit
    if self.normalized:
        scr /= self.self_hits[q_idx]

    # For the mean score we also have to produce the reverse score
    if scores in ('mean', 'min', 'max', 'both'):
        reverse = self.single_query_target(t_idx, q_idx, scores='forward')
        if scores == 'mean':
            scr = (scr + reverse) / 2
        elif scores == 'min':
            scr = min(scr, reverse)
        elif scores == 'max':
            scr = max(scr, reverse)
        elif scores == 'both':
            # If both scores are requested
            scr = [scr, reverse]

    return scr

Align data types of dotprops.

PARAMETER DESCRIPTION
*x

TYPE: Dotprops | NeuronLists thereof DEFAULT: ()

downcast
    If True, will downcast all points to the lowest precision
    dtype.

TYPE: bool DEFAULT: False

inplace
    If True, will modify the original neuron objects. If False, will
    make a copy before changing dtypes.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
*x

Input data with aligned dtypes.

Source code in navis/nbl/nblast_funcs.py
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
def align_dtypes(*x, downcast=False, inplace=True):
    """Align data types of dotprops.

    Parameters
    ----------
    *x :        Dotprops | NeuronLists thereof
    downcast :  bool
                If True, will downcast all points to the lowest precision
                dtype.
    inplace :   bool
                If True, will modify the original neuron objects. If False, will
                make a copy before changing dtypes.

    Returns
    -------
    *x
                Input data with aligned dtypes.

    """
    dtypes = get_dtypes(x)

    if len(dtypes) == 1:
        return x

    if not inplace:
        for i in range(x):
            x[i] = x[i].copy()

    if downcast:
        target = lowest_type(*x)
    else:
        target = np.result_type(*dtypes)

    for n in x:
        if isinstance(n, NeuronList):
            for i in range(len(n)):
                n[i].points = n[i].points.astype(target, copy=False)
        elif isinstance(n, Dotprops):
            n.points = n.points.astype(target, copy=False)
        else:
            raise TypeError(f'Unable to process "{type(n)}"')

    return x

Check if neuron data is in microns.

Returns either [True, None (=unclear), False]

Source code in navis/nbl/nblast_funcs.py
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
def check_microns(x):
    """Check if neuron data is in microns.

    Returns either [True, None (=unclear), False]
    """
    microns = [config.ureg.Unit('microns'),
               config.ureg.Unit('um'),
               config.ureg.Unit('micrometer'),
               config.ureg.Unit('dimensionless')]
    if not isinstance(x, NeuronList):
        x = NeuronList(x)

    # For very large NeuronLists converting the unit string to pint units is
    # the time consuming step. Here we will first reduce to unique units:
    unit_str = []
    for n in x:
        if isinstance(n._unit_str, str):
            unit_str.append(n._unit_str)
        else:
            unit_str += list(n._unit_str)
    unit_str = np.unique(unit_str)

    any_not_microns = False
    all_units = True
    for u in unit_str:
        # If not a unit (i.e. `None`)
        if not u:
            all_units = False
            continue

        # Convert to proper unit
        u = config.ureg(u).to_compact().units

        if u not in microns:
            any_not_microns = True

    if any_not_microns:
        return False
    elif all_units:
        return True
    return None

Check if pykdtree is used and if the OMP_NUM_THREADS flag is set.

The issue is that on Linux pykdtree uses threads by default which causes issues when we're also using multiple cores (= multiple layers of concurrency).

Source code in navis/nbl/nblast_funcs.py
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
def check_pykdtree_flag():
    """Check if pykdtree is used and if the OMP_NUM_THREADS flag is set.

    The issue is that on Linux pykdtree uses threads by default which causes
    issues when we're also using multiple cores (= multiple layers of concurrency).
    """
    # This is only relevant for Linux (unless someone compiled pykdtree
    # themselves using a compiler that supports openmp)
    from sys import platform
    if platform not in ("linux", "linux2"):
        return

    # See if pykdtree is present
    try:
        import pykdtree
    except ModuleNotFoundError:
        # If not present, just return
        return

    import os
    if os.environ.get('OMP_NUM_THREADS', None) != "1":
        msg = ('`OMP_NUM_THREADS` environment variable not set to 1. This may '
               'result in multiple layers of concurrency which in turn will '
               'slow down NBLAST when using multiple cores. '
               'See also https://github.com/navis-org/navis/issues/49')
        logger.warning(msg)

Determine the lower of two dtypes.

Source code in navis/nbl/nblast_funcs.py
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
def demote_types(a, b):
    """Determine the lower of two dtypes."""
    if isinstance(a, np.ndarray):
        a = a.dtype
    if isinstance(b, np.ndarray):
        b = b.dtype

    # No change is same
    if a == b:
        return a

    # First, get the "higher" type
    higher = np.promote_types(a, b)
    # Now get the one that's not higher
    if a != higher:
        return a
    return b

Evaluate limit_dist parameter.

Source code in navis/nbl/nblast_funcs.py
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
def eval_limit_dist(x):
    """Evaluate `limit_dist` parameter."""
    if x == 'auto':
        return
    if isinstance(x, type(None)):
        return
    if isinstance(x, numbers.Number):
        return

    raise ValueError(f'`limit_dist` must be None, "auto" or float, got {x}' )

Find partitions such that each batch takes about T seconds.

PARAMETER DESCRIPTION
q
    Query and targets, respectively.

T
    Time (in seconds) to aim for.

TYPE: int DEFAULT: 10

n_cores
    Number of cores that will be used. If provided, will try to
    make sure that (n_rows * n_cols) is a multiple of n_cores by
    increasing the number of rows (thereby decreasing the time
    per batch).

TYPE: int DEFAULT: None

RETURNS DESCRIPTION
(n_rows, n_cols)
Source code in navis/nbl/nblast_funcs.py
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
def find_batch_partition(q, t, T=10, n_cores=None):
    """Find partitions such that each batch takes about `T` seconds.

    Parameters
    ----------
    q,t :       NeuronList of Dotprops
                Query and targets, respectively.
    T :         int
                Time (in seconds) to aim for.
    n_cores :   int, optional
                Number of cores that will be used. If provided, will try to
                make sure that (n_rows * n_cols) is a multiple of n_cores by
                increasing the number of rows (thereby decreasing the time
                per batch).

    Returns
    -------
    n_rows, n_cols

    """
    # Test a single query
    time_per_query = test_single_query_time(q, t)

    # Number of queries per job such that each job runs in `T` second
    queries_per_batch = T / time_per_query

    # Number of neurons per batch
    neurons_per_batch  = max(1, int(np.sqrt(queries_per_batch)))

    n_rows = max(1, len(q) // neurons_per_batch)
    n_cols = max(1, len(t) // neurons_per_batch)

    if n_cores and ((n_rows * n_cols) > n_cores):
        while (n_rows * n_cols) % n_cores:
            n_rows += 1

    return n_rows, n_cols

Force data into Dotprops.

Source code in navis/nbl/nblast_funcs.py
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
def force_dotprops(x, k, resample, progress=False):
    """Force data into Dotprops."""
    if isinstance(x, (NeuronList, list)):
        dp = [force_dotprops(n, k, resample) for n in config.tqdm(x,
                                                                  desc='Dotprops',
                                                                  disable=not progress,
                                                                  leave=False)]
        return NeuronList(dp)

    # Try converting non-Dotprops
    if not isinstance(x, Dotprops):
        return make_dotprops(x, k=k, resample=resample)

    # Return Dotprops
    return x

Collect data types of dotprops points.

Source code in navis/nbl/nblast_funcs.py
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
def get_dtypes(*x):
    """Collect data types of dotprops points."""
    dtypes = set()
    for n in x:
        if isinstance(n, NeuronList):
            dtypes = dtypes | get_dtypes(n)
        elif isinstance(n, Dotprops):
            dtypes.add(n.points.dtype)
        else:
            raise TypeError(f'Unable to process "{type(n)}"')
    return dtypes

Find the lowest data type.

Source code in navis/nbl/nblast_funcs.py
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
def lowest_type(*x):
    """Find the lowest data type."""
    dtypes = get_dtypes(x)

    if len(dtypes) == 1:
        return dtypes[0]

    lowest = dtypes[0]
    for dt in dtypes[1:]:
        lowest = demote_types(lowest, dt)

    return lowest

Convert similarity scores to distances.

PARAMETER DESCRIPTION
x
Similarity score matrix to invert.

TYPE: (M, M) np.ndarray | pandas.DataFrame

RETURNS DESCRIPTION
distances
Source code in navis/nbl/nblast_funcs.py
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
def sim_to_dist(x):
    """Convert similarity scores to distances.

    Parameters
    ----------
    x :     (M, M) np.ndarray | pandas.DataFrame
            Similarity score matrix to invert.

    Returns
    -------
    distances

    """
    if not isinstance(x, (np.ndarray, pd.DataFrame)):
        raise TypeError(f'Expected numpy array or pandas DataFrame, got "{type(x)}"')

    if isinstance(x, pd.DataFrame):
        mx = x.values.max()
    else:
        mx = x.max()

    return (x - mx) * -1
Source code in navis/nbl/smat.py
1050
1051
1052
def smat_fcwb(alpha=False):
    # deepcopied so that mutations do not propagate to cache
    return deepcopy(_smat_fcwb(alpha))

Test average time of a single NBLAST query.

Source code in navis/nbl/nblast_funcs.py
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
def test_single_query_time(q, t, it=100):
    """Test average time of a single NBLAST query."""
    # Get a median-sized query and target
    q_ix = np.argsort(q.n_points)[len(q)//2]
    t_ix = np.argsort(t.n_points)[len(t)//2]

    # Run a quick single query benchmark
    timings = []
    for i in range(it):  # Run N tests
        s = time.time()
        _ = t[t_ix].dist_dots(q[q_ix])  # Dist dot (ignore scoring / normalizing)
        timings.append(time.time() - s)
    return np.mean(timings)  # seconds per medium sized query