58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227 | class SynBlaster(Blaster):
"""Implements a synapsed-based NBLAST algorithm.
Please note that some properties are computed on initialization and
changing parameters at a later stage will mess things up!
TODOs
-----
- implement `use_alpha` as average synapse density (i.e. emphasize areas
where a neuron has lots of synapses)
Parameters
----------
normalized : bool
If True, will normalize scores by the best possible score
(i.e. self-self) of the query neuron.
by_type : bool
If True will only compare synapses with the same value in
the "type" column.
smat : navis.nbl.smat.Lookup2d | pd.DataFrame | str
How to convert the point match pairs into an NBLAST score,
usually by a lookup table.
If 'auto' (default), will use scoring matrices
from FCWB. Same behaviour as in R's nat.nblast
implementation.
Dataframes will be used to build a `Lookup2d`.
If `smat=None` the scores will be
generated as the product of the distances and the dotproduct
of the vectors of nearest-neighbor pairs.
progress : bool
If True, will show a progress bar.
"""
def __init__(self, normalized=True, by_type=True,
smat='auto', progress=True):
"""Initialize class."""
super().__init__(progress=progress)
self.normalized = normalized
self.by_type = by_type
if smat is None:
self.score_fn = operator.mul
elif smat == 'auto':
self.score_fn = smat_fcwb()
elif isinstance(smat, pd.DataFrame):
self.score_fn = Lookup2d.from_dataframe(smat)
else:
self.score_fn = smat
self.ids = []
def append(self, neuron, id=None, self_hit=None) -> NestedIndices:
"""Append neurons/connector tables, returning numerical indices of added objects.
Note that `self_hit` is ignored and hence calculated from scratch
when `neuron` is a (nested) list of neurons.
"""
if isinstance(neuron, pd.DataFrame):
return self._append_connectors(neuron, id, self_hit=self_hit)
if isinstance(neuron, BaseNeuron):
if not neuron.has_connectors:
raise ValueError('Neuron must have synapses')
return self._append_connectors(neuron.connectors, neuron.id, self_hit=self_hit)
try:
return [self.append(n) for n in neuron]
except TypeError:
raise ValueError(
"Expected a dataframe, or a Neuron or sequence thereof; got "
f"{type(neuron)}"
)
def _append_connectors(self, connectors: pd.DataFrame, id, self_hit=None) -> int:
if id is None:
raise ValueError("Explicit non-None id required for appending connectors")
next_idx = len(self)
self.ids.append(id)
self.neurons.append({})
if not self.by_type:
data = connectors[['x', 'y', 'z']].values
# Generate the KDTree
self.neurons[-1]['all'] = data # KDTree(data)
else:
if 'type' not in connectors.columns:
raise ValueError('Connector tables must have a "type" column '
'if `by_type=True`')
for ty in connectors['type'].unique():
data = connectors.loc[connectors['type'] == ty, ['x', 'y', 'z']].values
# Generate the KDTree
self.neurons[-1][ty] = data # KDTree(data)
# Calculate score for self hit if required
if not self_hit:
self_hit = self.calc_self_hit(connectors)
self.self_hits.append(self_hit)
return next_idx
def _build_trees(self):
"""Build KDTree for all neurons."""
for i, n in enumerate(self.neurons):
for ty, data in n.items():
n[ty] = KDTree(data)
def calc_self_hit(self, cn):
"""Non-normalized value for self hit."""
return cn.shape[0] * self.score_fn(0, 1)
def single_query_target(self, q_idx: int, t_idx: int, scores='forward'):
"""Query single target against single target."""
# Take a short-cut if this is a self-self comparison
if q_idx == t_idx:
if self.normalized:
return 1
return self.self_hits[q_idx]
# Run nearest-neighbor search for query against target
t_trees = self.neurons[t_idx]
q_trees = self.neurons[q_idx]
dists = []
# Go over all connector types (e.g. "pre" and "post") present in the
# query neuron. If `by_type=False`, there will be a single tree simply
# called "all"
for ty, qt in q_trees.items():
# If target does not have this type of connectors
if ty not in t_trees:
# Note that this infinite distance will simply get the worst
# score possible in the scoring function
dists = np.append(dists, [self.score_fn.max_dist] * qt.data.shape[0])
else:
# Note: we're building the trees lazily here once we actually need them.
# The main reason is that pykdtree is not picklable and hence
# we can't pass it to the worker processes.
if not isinstance(t_trees[ty], KDTree):
t_trees[ty] = KDTree(t_trees[ty])
tt = t_trees[ty]
if isinstance(qt, KDTree):
data = qt.data
else:
data = qt
# pykdtree tracks data as flat array
if data.ndim == 1:
data = data.reshape((qt.n, qt.ndim))
dists = np.append(dists, tt.query(data)[0])
# We use the same scoring function as for normal NBLAST but ignore the
# vector dotproduct component
scr = self.score_fn(dists, 1).sum()
# Normalize against best possible hit (self hit)
if self.normalized:
scr /= self.self_hits[q_idx]
# For the mean score we also have to produce the reverse score
if scores in ('mean', 'min', 'max'):
reverse = self.single_query_target(t_idx, q_idx, scores='forward')
if scores == 'mean':
scr = (scr + reverse) / 2
elif scores == 'min':
scr = min(scr, reverse)
elif scores == 'max':
scr = max(scr, reverse)
return scr
|