Skip to content

transforms

Affine transformation of 3D spatial data.

PARAMETER DESCRIPTION
matrix
        Affine matrix.

TYPE: (4, 4) np.ndarray

Examples:

A simple scaling transform

>>> from navis import transforms
>>> import numpy as np
>>> M = np.diag([1e3, 1e3, 1e3, 1])
>>> tr = transforms.affine.AffineTransform(M)
>>> points = np.array([[0, 0, 0], [1, 1, 1]])
>>> tr.xform(points)
array([[   0.,    0.,    0.],
       [1000., 1000., 1000.]])
Source code in navis/transforms/affine.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class AffineTransform(BaseTransform):
    """Affine transformation of 3D spatial data.

    Parameters
    ----------
    matrix :        (4, 4) np.ndarray
                    Affine matrix.

    Examples
    --------
    A simple scaling transform

    >>> from navis import transforms
    >>> import numpy as np
    >>> M = np.diag([1e3, 1e3, 1e3, 1])
    >>> tr = transforms.affine.AffineTransform(M)
    >>> points = np.array([[0, 0, 0], [1, 1, 1]])
    >>> tr.xform(points)
    array([[   0.,    0.,    0.],
           [1000., 1000., 1000.]])

    """

    def __init__(self, matrix: np.ndarray, direction: str = 'forward'):
        """Initialize transform."""
        assert direction in ('forward', 'inverse')

        self.matrix = matrix

        if direction == 'inverse':
            self.matrix = np.linalg.inv(self.matrix)

    def __eq__(self, other: 'AffineTransform') -> bool:
        """Implements equality comparison."""
        if isinstance(other, AffineTransform):
            if np.all(self.matrix == other.matrix):
                return True
        return False

    def __neg__(self):
        """Invert direction."""
        x = self.copy()

        # Invert affine matrix
        x.matrix = np.linalg.inv(x.matrix)

        return x

    def copy(self) -> 'AffineTransform':
        """Return copy of transform."""
        # Attributes not to copy
        no_copy = []
        # Generate new empty transform
        x = self.__class__(None)
        # Override with this neuron's data
        x.__dict__.update({k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy})

        return x

    def xform(self, points: np.ndarray, invert: bool = False) -> np.ndarray:
        """Apply transform to points.

        Parameters
        ----------
        points :        np.ndarray
                        (N, 3) array of x/y/z locations.
        invert :        bool
                        If True, will invert the transform.

        Returns
        -------
        pointsxf :      np.ndarray
                        The transformed points.

        """
        points = np.asarray(points)

        if points.ndim != 2 and points.shape[1] != 3:
            raise ValueError('`points` must be of shape (N, 3)')

        # Add a fourth column to points
        points_mat = np.ones((points.shape[0], 4))
        points_mat[:, :3] = points

        # Apply transform
        if not invert:
            mat = self.matrix
        else:
            mat = np.linalg.inv(self.matrix)

        return np.dot(mat, points_mat.T).T[:, :3]

Initialize transform.

Source code in navis/transforms/affine.py
44
45
46
47
48
49
50
51
def __init__(self, matrix: np.ndarray, direction: str = 'forward'):
    """Initialize transform."""
    assert direction in ('forward', 'inverse')

    self.matrix = matrix

    if direction == 'inverse':
        self.matrix = np.linalg.inv(self.matrix)

Return copy of transform.

Source code in navis/transforms/affine.py
69
70
71
72
73
74
75
76
77
78
def copy(self) -> 'AffineTransform':
    """Return copy of transform."""
    # Attributes not to copy
    no_copy = []
    # Generate new empty transform
    x = self.__class__(None)
    # Override with this neuron's data
    x.__dict__.update({k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy})

    return x

Apply transform to points.

PARAMETER DESCRIPTION
points
        (N, 3) array of x/y/z locations.

TYPE: np.ndarray

invert
        If True, will invert the transform.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
pointsxf

The transformed points.

TYPE: np.ndarray

Source code in navis/transforms/affine.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def xform(self, points: np.ndarray, invert: bool = False) -> np.ndarray:
    """Apply transform to points.

    Parameters
    ----------
    points :        np.ndarray
                    (N, 3) array of x/y/z locations.
    invert :        bool
                    If True, will invert the transform.

    Returns
    -------
    pointsxf :      np.ndarray
                    The transformed points.

    """
    points = np.asarray(points)

    if points.ndim != 2 and points.shape[1] != 3:
        raise ValueError('`points` must be of shape (N, 3)')

    # Add a fourth column to points
    points_mat = np.ones((points.shape[0], 4))
    points_mat[:, :3] = points

    # Apply transform
    if not invert:
        mat = self.matrix
    else:
        mat = np.linalg.inv(self.matrix)

    return np.dot(mat, points_mat.T).T[:, :3]

Helper transform that simply passes points through.

Useful for defining aliases.

Source code in navis/transforms/base.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class AliasTransform(BaseTransform):
    """Helper transform that simply passes points through.

    Useful for defining aliases.
    """

    def __init__(self):
        """Initialize."""
        pass

    def __neg__(self) -> 'AliasTransform':
        """Invert transform."""
        return self.copy()

    def __eq__(self, other):
        """Check if the same."""
        if isinstance(other, AliasTransform):
            True
        return False

    def copy(self):
        """Return copy."""
        x = AliasTransform()
        x.__dict__.update(self.__dict__)
        return x

    def xform(self, points: np.ndarray) -> np.ndarray:
        """Pass through.

        Note that the returned points are NOT a copy but the originals.
        """
        return points

Initialize.

Source code in navis/transforms/base.py
79
80
81
def __init__(self):
    """Initialize."""
    pass

Return copy.

Source code in navis/transforms/base.py
93
94
95
96
97
def copy(self):
    """Return copy."""
    x = AliasTransform()
    x.__dict__.update(self.__dict__)
    return x

Pass through.

Note that the returned points are NOT a copy but the originals.

Source code in navis/transforms/base.py
 99
100
101
102
103
104
def xform(self, points: np.ndarray) -> np.ndarray:
    """Pass through.

    Note that the returned points are NOT a copy but the originals.
    """
    return points

CMTK transforms of 3D spatial data.

Requires CMTK to be installed.

PARAMETER DESCRIPTION
regs
        Path(s) to CMTK transformations(s).

TYPE: str | list of str

directions
        Direction of transformation. Must provide one direction per
        `reg`.

TYPE: "forward" | "inverse" | list thereof DEFAULT: 'forward'

threads
        Number of threads to use.

TYPE: int DEFAULT: None

Examples:

>>> from navis import transforms
>>> tr = transforms.cmtk.CMTKtransform('/path/to/CMTK_directory.list')
>>> tr.xform(points)
Source code in navis/transforms/cmtk.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
class CMTKtransform(BaseTransform):
    """CMTK transforms of 3D spatial data.

    Requires [CMTK](https://www.nitrc.org/projects/cmtk/) to be installed.

    Parameters
    ----------
    regs :          str | list of str
                    Path(s) to CMTK transformations(s).
    directions :    "forward" | "inverse" | list thereof
                    Direction of transformation. Must provide one direction per
                    `reg`.
    threads :       int, optional
                    Number of threads to use.

    Examples
    --------
    >>> from navis import transforms
    >>> tr = transforms.cmtk.CMTKtransform('/path/to/CMTK_directory.list')
    >>> tr.xform(points) # doctest: +SKIP

    """

    def __init__(self, regs: list, directions: str = 'forward', threads: int = None):
        self.directions = list(utils.make_iterable(directions))
        for d in self.directions:
            assert d in ('forward', 'inverse'), ('`direction` must be "foward"'
                                                 f'or "inverse", not "{d}"')

        self.regs = list(utils.make_iterable(regs))
        self.command = 'streamxform'
        self.threads = threads

        if len(directions) == 1 and len(regs) >= 1:
            directions = directions * len(regs)

        if len(self.regs) != len(self.directions):
            raise ValueError('Must provide one direction per regs')

    def __eq__(self, other: 'CMTKtransform') -> bool:
        """Implement equality comparison."""
        if isinstance(other, CMTKtransform):
            if len(self) == len(other):
                if all([self.regs[i] == other.regs[i] for i in range(len(self))]):
                    if all([self.directions[i] == other.directions[i] for i in range(len(self))]):
                        return True
        return False

    def __len__(self) -> int:
        return len(self.regs)

    def __neg__(self) -> 'CMTKtransform':
        """Invert direction."""
        x = self.copy()

        # Swap directions
        x.directions = [{'forward': 'inverse',
                         'inverse': 'forward'}[d] for d in x.directions]

        # Reverse order
        x.regs = x.regs[::-1]
        x.directions = x.directions[::-1]

        return x

    def __str__(self):
        return self.__repr__()

    def __repr__(self):
        return f'CMTKtransform with {len(self)} transform(s)'

    @staticmethod
    def from_file(filepath: str, **kwargs) -> 'CMTKtransform':
        """Generate CMTKtransform from file.

        Parameters
        ----------
        filepath :  str
                    Path to CMTK transform.
        **kwargs
                    Keyword arguments passed to CMTKtransform.__init__

        Returns
        -------
        CMTKtransform

        """
        defaults = {'directions': 'forward'}
        defaults.update(kwargs)
        return CMTKtransform(str(filepath), **defaults)

    def make_args(self, affine_only: bool = False) -> list:
        """Generate arguments passed to subprocess."""
        # Generate the arguments
        # The actual command (i.e. streamxform)
        args = [str(_cmtkbin / self.command)]

        if affine_only:
            args.append('--affine-only')

        if self.threads:
            args.append(f'--threads {int(self.threads)}')

        # Add the regargs
        args += self.regargs

        return args

    @property
    def regargs(self) -> list:
        """Generate regargs."""
        regargs = []
        for i, (reg, dir) in enumerate(zip(self.regs, self.directions)):
            if dir == 'inverse':
                # For the first transform we need to prefix "--inverse" with
                # a solitary "--"
                if i == 0:
                    regargs.append('--')
                regargs.append('--inverse')
            # Note no double quotes!
            regargs.append(f'{reg}')
        return regargs

    def append(self, transform: 'CMTKtransform', direction: str = None):
        """Add another transform.

        Parameters
        ----------
        transform :     str | CMTKtransform
                        Either another CMTKtransform or filepath to registration.
        direction :     "forward" | "inverse"
                        Only relevant if transform is filepath.

        """
        if isinstance(transform, CMTKtransform):
            if self.command != transform.command:
                raise ValueError('Unable to merge CMTKtransforms using '
                                 'different commands.')

            self.regs += transform.regs
            self.directions += transform.directions
        elif isinstance(transform, str):
            if not direction:
                raise ValueError('Must provide direction along with new transform')
            self.regs.append(transform)
            self.directions.append(direction)
        else:
            raise NotImplementedError(f'Unable to append {type(transform)} to {type(self)}')

    def check_if_possible(self, on_error: str = 'raise'):
        """Check if this transform is possible."""
        if not _cmtkbin:
            msg = 'Folder with CMTK binaries not found. Make sure the ' \
                  'directory is in your PATH environment variable.'
            if on_error == 'raise':
                raise BaseException(msg)
            return msg
        for r in self.regs:
            if not os.path.isdir(r) and not os.path.isfile(r):
                msg = f'Registration {r} not found.'
                if on_error == 'raise':
                    raise BaseException(msg)
                return msg

    def copy(self) -> 'CMTKtransform':
        """Return copy."""
        # Attributes not to copy
        no_copy = []
        # Generate new empty transform
        x = self.__class__(None)
        # Override with this neuron's data
        x.__dict__.update({k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy})

        return x

    def parse_cmtk_output(self, output: str, fail_value=np.nan) -> np.ndarray:
        r"""Parse CMTK output.

        Briefly, CMTK output will be a byte literal like this:

            b'311 63 23 \n275 54 25 \n'

        In case of failed transforms we will get something like this where the
        original coordinates are returned with a "FAILED" flag

            b'343 72 23 \n-10 -10 -10 FAILED \n'

        Parameter
        ---------
        output :        tuple of (b'', None)
                        Stdout of CMTK call.
        fail_value
                        Value to use for points that failed to transform. By
                        default we use `np.nan`.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        The parse transformed points.

        """
        # The original stout is tuple where we care only about the second one
        if isinstance(output, tuple):
            output = output[0]

        pointsx = []
        # Split string into rows - lazily using a generator
        for row in (x.group(1) for x in re.finditer(r"(.*?) \n",
                                                    output.decode())):
            # Split into values
            values = row.split(' ')

            # If this point failed
            if len(values) != 3:
                values = [fail_value] * 3
            else:
                values = [float(v) for v in values]

            pointsx.append(values)

        return np.asarray(pointsx)

    def xform(self, points: np.ndarray,
              affine_only: bool = False,
              affine_fallback: bool = False) -> np.ndarray:
        """Xform data.

        Parameters
        ----------
        points :            (N, 3) numpy array | pandas.DataFrame
                            Points to xform. DataFrame must have x/y/z columns.
        affine_only :       bool
                            Whether to apply only the non-rigid affine
                            transform. This is useful if points are outside
                            the deformation field and would therefore not
                            transform properly.
        affine_fallback :   bool
                            If True and some points did not transform during the
                            non-rigid part of the transformation, we will apply
                            only the affine transformation to those points.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        Transformed points. Points that failed to transform will
                        be `np.nan`.

        """
        self.check_if_possible(on_error='raise')

        if isinstance(points, pd.DataFrame):
            # Make sure x/y/z columns are present
            if np.any([c not in points for c in ['x', 'y', 'z']]):
                raise ValueError('points DataFrame must have x/y/z columns.')
        elif isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3:
            points = pd.DataFrame(points, columns=['x', 'y', 'z'])
        else:
            raise TypeError('`points` must be numpy array of shape (N, 3) or '
                            'pandas DataFrame with x/y/z columns')

        # Generate the result
        args = self.make_args(affine_only=affine_only)
        proc = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

        # Pipe in the points
        points_str = points[['x', 'y', 'z']].to_string(index=False,
                                                       header=False)

        # Do not use proc.stdin.write to avoid output buffer becoming full
        # before we finish piping in stdin.
        #proc.stdin.write(points_str.encode())
        #output = proc.communicate()

        # Read out results
        # This is equivalent to e.g.:
        # $ streamxform -args <<< "10, 10, 10"
        output = proc.communicate(input=points_str.encode())

        # If no output, something went wrong
        if not output[0]:
            raise utils.CMTKError('CMTK produced no output. Check points?')

        # Xformed points
        xf = self.parse_cmtk_output(output, fail_value=np.nan)

        # Check if any points not xformed
        if affine_fallback and not affine_only:
            not_xf = np.any(np.isnan(xf), axis=1)
            if np.any(not_xf):
                xf[not_xf] = self.xform(points.loc[not_xf], affine_only=True)

        return xf

    def xform_image(self,
                    im,
                    target,
                    out=None,
                    interpolation="linear",                    
                    verbose=False,
                    ):
        """Transform an image using CMTK's reformatx.

        Parameters 
        ----------
        im :        3D numpy array | filepath
                    The floating image to transform.
        target :    str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) | (Nx, Ny, Nz, dx, dy, dz, Ox, Oy, Oz)
                    Defines the target image: dimensions in voxels (N), the voxel size (d) and optionally 
                    an origin (0) for the target image. Can be provided as a string (name of a template),
                    a TemplateBrain object, a tuple/list/array with the target specs.
        out :       str, optional
                    The filepath to save the transformed image. If None (default), will return the 
                    transformed image as np.ndarray.
        interpolation : "linear" | "nn" | "cubic" | "pv" | "sinc-cosine" | "sinc-hamming"
                    The interpolation method to use.
        verbose :   bool
                    Whether to print CMTK output.

        Returns
        -------
        np.ndarray | None
                    If out is None, returns the transformed image as np.ndarray. Otherwise, None.

        """
        assert interpolation in ("linear", "nn", "cubic", "pv", "sinc-cosine", "sinc-hamming")

        # `reformatx` expects this format:
        # ./reformatx --floating {INPUT_FILE} -o {OUTPUT_FILE} {REFERENCE_SPECS} {TRANSFORMS} 
        # where:
        # - {INPUT_FILE} is the image to transform
        # - {OUTPUT_FILE} is where the output will be saved
        # - {REFERENCE_SPECS} defines the target space; this needs to be eitheran NRRD
        #   file from which CMTK can extract the target grid or the actual specs: 
        #   "--target-grid Nx,Ny,Nz:dX,dY,dZ:[Ox,Oy,Oz]" where N is the number of
        #   voxels in each dimension and d is the voxel size. The optional O is the
        #   origin of the image. If not provided, it is assumed to be (0, 0, 0).
        # - {TRANSFORMS} are the CMTK transform(s) to apply; prefix with "--inverse" to invert
        # Below command works to convert JFRC2 to FCWB:
        # /opt/local/lib/cmtk/bin/reformatx --verbose --floating JFRC2.nrrd -o JFRC2_xf.nrrd FCWB.nrrd --inverse /Users/philipps/flybrain-data/BridgingRegistrations/JFRC2_FCWB.list
        # This took XX minutes - should check if that is actually faster than the look-up approach we 
        # use in `images.py`

        target_specs = parse_target_specs(target)

        to_remove = []
        if isinstance(im, (str, pathlib.Path)):
            floating = pathlib.Path(im)
            if not im.is_file():
                raise ValueError(f"Image file not found: {im}")
        elif isinstance(im, np.ndarray):
            assert im.ndim == 3
            # Save to temporary file
            with tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False, delete_on_close=False) as tf:
                nrrd.write(tf.name, im)
                floating = tf.name
                to_remove.append(tf.name)
        else:
            raise ValueError(f"Invalid image type: {type(im)}")

        if out is None:
            outfile = tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False, delete_on_close=False).name
            to_remove.append(outfile)
        elif isinstance(out, (str, pathlib.Path)):
            outfile = pathlib.Path(out).resolve()
        else:
            raise ValueError(f"Invalid output type: {type(out)}")

        # Compile the command 
        args = [str(_cmtkbin / 'reformatx')]
        args += [f'-o {outfile}']
        args += [f'--floating {floating}']
        args += [f'--{interpolation}']
        args += [target_specs]

        # Add the regargs
        args += self.regargs      

        try:
            # run the binary
            # avoid resourcewarnings with null
            with open(os.devnull, "w") as devnull:
                startupinfo = None
                if platform.system() == "Windows":
                    startupinfo = subprocess.STARTUPINFO()
                    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
                if verbose:
                    # in debug mode print the output
                    stdout = None
                else:
                    stdout = devnull

                if verbose:
                    config.logger.info("executing: {}".format(" ".join(args)))
                check_call(
                    args,
                    stdout=stdout,
                    stderr=subprocess.STDOUT,
                    startupinfo=startupinfo,
                )  

            if out is None:
                # Return transformed image
                return nrrd.read(outfile)
            elif verbose:
                config.logger.info(f"Transformed image saved to {outfile}")
        except BaseException:
            raise 
        finally:
            # Clean up temporary files
            for f in to_remove:
                os.remove(f) 

Generate regargs.

Add another transform.

PARAMETER DESCRIPTION
transform
        Either another CMTKtransform or filepath to registration.

TYPE: str | CMTKtransform

direction
        Only relevant if transform is filepath.

TYPE: "forward" | "inverse" DEFAULT: None

Source code in navis/transforms/cmtk.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def append(self, transform: 'CMTKtransform', direction: str = None):
    """Add another transform.

    Parameters
    ----------
    transform :     str | CMTKtransform
                    Either another CMTKtransform or filepath to registration.
    direction :     "forward" | "inverse"
                    Only relevant if transform is filepath.

    """
    if isinstance(transform, CMTKtransform):
        if self.command != transform.command:
            raise ValueError('Unable to merge CMTKtransforms using '
                             'different commands.')

        self.regs += transform.regs
        self.directions += transform.directions
    elif isinstance(transform, str):
        if not direction:
            raise ValueError('Must provide direction along with new transform')
        self.regs.append(transform)
        self.directions.append(direction)
    else:
        raise NotImplementedError(f'Unable to append {type(transform)} to {type(self)}')

Check if this transform is possible.

Source code in navis/transforms/cmtk.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def check_if_possible(self, on_error: str = 'raise'):
    """Check if this transform is possible."""
    if not _cmtkbin:
        msg = 'Folder with CMTK binaries not found. Make sure the ' \
              'directory is in your PATH environment variable.'
        if on_error == 'raise':
            raise BaseException(msg)
        return msg
    for r in self.regs:
        if not os.path.isdir(r) and not os.path.isfile(r):
            msg = f'Registration {r} not found.'
            if on_error == 'raise':
                raise BaseException(msg)
            return msg

Return copy.

Source code in navis/transforms/cmtk.py
315
316
317
318
319
320
321
322
323
324
def copy(self) -> 'CMTKtransform':
    """Return copy."""
    # Attributes not to copy
    no_copy = []
    # Generate new empty transform
    x = self.__class__(None)
    # Override with this neuron's data
    x.__dict__.update({k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy})

    return x

Generate CMTKtransform from file.

PARAMETER DESCRIPTION
filepath
    Path to CMTK transform.

TYPE: str

**kwargs
    Keyword arguments passed to CMTKtransform.__init__

DEFAULT: {}

RETURNS DESCRIPTION
CMTKtransform
Source code in navis/transforms/cmtk.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
@staticmethod
def from_file(filepath: str, **kwargs) -> 'CMTKtransform':
    """Generate CMTKtransform from file.

    Parameters
    ----------
    filepath :  str
                Path to CMTK transform.
    **kwargs
                Keyword arguments passed to CMTKtransform.__init__

    Returns
    -------
    CMTKtransform

    """
    defaults = {'directions': 'forward'}
    defaults.update(kwargs)
    return CMTKtransform(str(filepath), **defaults)

Generate arguments passed to subprocess.

Source code in navis/transforms/cmtk.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def make_args(self, affine_only: bool = False) -> list:
    """Generate arguments passed to subprocess."""
    # Generate the arguments
    # The actual command (i.e. streamxform)
    args = [str(_cmtkbin / self.command)]

    if affine_only:
        args.append('--affine-only')

    if self.threads:
        args.append(f'--threads {int(self.threads)}')

    # Add the regargs
    args += self.regargs

    return args

Parse CMTK output.

Briefly, CMTK output will be a byte literal like this:

b'311 63 23 \n275 54 25 \n'

In case of failed transforms we will get something like this where the original coordinates are returned with a "FAILED" flag

b'343 72 23 \n-10 -10 -10 FAILED \n'
Parameter

output : tuple of (b'', None) Stdout of CMTK call. fail_value Value to use for points that failed to transform. By default we use np.nan.

RETURNS DESCRIPTION
pointsxf

The parse transformed points.

TYPE: (N, 3) numpy array

Source code in navis/transforms/cmtk.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def parse_cmtk_output(self, output: str, fail_value=np.nan) -> np.ndarray:
    r"""Parse CMTK output.

    Briefly, CMTK output will be a byte literal like this:

        b'311 63 23 \n275 54 25 \n'

    In case of failed transforms we will get something like this where the
    original coordinates are returned with a "FAILED" flag

        b'343 72 23 \n-10 -10 -10 FAILED \n'

    Parameter
    ---------
    output :        tuple of (b'', None)
                    Stdout of CMTK call.
    fail_value
                    Value to use for points that failed to transform. By
                    default we use `np.nan`.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    The parse transformed points.

    """
    # The original stout is tuple where we care only about the second one
    if isinstance(output, tuple):
        output = output[0]

    pointsx = []
    # Split string into rows - lazily using a generator
    for row in (x.group(1) for x in re.finditer(r"(.*?) \n",
                                                output.decode())):
        # Split into values
        values = row.split(' ')

        # If this point failed
        if len(values) != 3:
            values = [fail_value] * 3
        else:
            values = [float(v) for v in values]

        pointsx.append(values)

    return np.asarray(pointsx)

Xform data.

PARAMETER DESCRIPTION
points
            Points to xform. DataFrame must have x/y/z columns.

TYPE: (N, 3) numpy array | pandas.DataFrame

affine_only
            Whether to apply only the non-rigid affine
            transform. This is useful if points are outside
            the deformation field and would therefore not
            transform properly.

TYPE: bool DEFAULT: False

affine_fallback
            If True and some points did not transform during the
            non-rigid part of the transformation, we will apply
            only the affine transformation to those points.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
pointsxf

Transformed points. Points that failed to transform will be np.nan.

TYPE: (N, 3) numpy array

Source code in navis/transforms/cmtk.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def xform(self, points: np.ndarray,
          affine_only: bool = False,
          affine_fallback: bool = False) -> np.ndarray:
    """Xform data.

    Parameters
    ----------
    points :            (N, 3) numpy array | pandas.DataFrame
                        Points to xform. DataFrame must have x/y/z columns.
    affine_only :       bool
                        Whether to apply only the non-rigid affine
                        transform. This is useful if points are outside
                        the deformation field and would therefore not
                        transform properly.
    affine_fallback :   bool
                        If True and some points did not transform during the
                        non-rigid part of the transformation, we will apply
                        only the affine transformation to those points.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    Transformed points. Points that failed to transform will
                    be `np.nan`.

    """
    self.check_if_possible(on_error='raise')

    if isinstance(points, pd.DataFrame):
        # Make sure x/y/z columns are present
        if np.any([c not in points for c in ['x', 'y', 'z']]):
            raise ValueError('points DataFrame must have x/y/z columns.')
    elif isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3:
        points = pd.DataFrame(points, columns=['x', 'y', 'z'])
    else:
        raise TypeError('`points` must be numpy array of shape (N, 3) or '
                        'pandas DataFrame with x/y/z columns')

    # Generate the result
    args = self.make_args(affine_only=affine_only)
    proc = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    # Pipe in the points
    points_str = points[['x', 'y', 'z']].to_string(index=False,
                                                   header=False)

    # Do not use proc.stdin.write to avoid output buffer becoming full
    # before we finish piping in stdin.
    #proc.stdin.write(points_str.encode())
    #output = proc.communicate()

    # Read out results
    # This is equivalent to e.g.:
    # $ streamxform -args <<< "10, 10, 10"
    output = proc.communicate(input=points_str.encode())

    # If no output, something went wrong
    if not output[0]:
        raise utils.CMTKError('CMTK produced no output. Check points?')

    # Xformed points
    xf = self.parse_cmtk_output(output, fail_value=np.nan)

    # Check if any points not xformed
    if affine_fallback and not affine_only:
        not_xf = np.any(np.isnan(xf), axis=1)
        if np.any(not_xf):
            xf[not_xf] = self.xform(points.loc[not_xf], affine_only=True)

    return xf

Transform an image using CMTK's reformatx.

Parameters

im : 3D numpy array | filepath The floating image to transform. target : str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) | (Nx, Ny, Nz, dx, dy, dz, Ox, Oy, Oz) Defines the target image: dimensions in voxels (N), the voxel size (d) and optionally an origin (0) for the target image. Can be provided as a string (name of a template), a TemplateBrain object, a tuple/list/array with the target specs. out : str, optional The filepath to save the transformed image. If None (default), will return the transformed image as np.ndarray. interpolation : "linear" | "nn" | "cubic" | "pv" | "sinc-cosine" | "sinc-hamming" The interpolation method to use. verbose : bool Whether to print CMTK output.

RETURNS DESCRIPTION
np.ndarray | None

If out is None, returns the transformed image as np.ndarray. Otherwise, None.

Source code in navis/transforms/cmtk.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def xform_image(self,
                im,
                target,
                out=None,
                interpolation="linear",                    
                verbose=False,
                ):
    """Transform an image using CMTK's reformatx.

    Parameters 
    ----------
    im :        3D numpy array | filepath
                The floating image to transform.
    target :    str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) | (Nx, Ny, Nz, dx, dy, dz, Ox, Oy, Oz)
                Defines the target image: dimensions in voxels (N), the voxel size (d) and optionally 
                an origin (0) for the target image. Can be provided as a string (name of a template),
                a TemplateBrain object, a tuple/list/array with the target specs.
    out :       str, optional
                The filepath to save the transformed image. If None (default), will return the 
                transformed image as np.ndarray.
    interpolation : "linear" | "nn" | "cubic" | "pv" | "sinc-cosine" | "sinc-hamming"
                The interpolation method to use.
    verbose :   bool
                Whether to print CMTK output.

    Returns
    -------
    np.ndarray | None
                If out is None, returns the transformed image as np.ndarray. Otherwise, None.

    """
    assert interpolation in ("linear", "nn", "cubic", "pv", "sinc-cosine", "sinc-hamming")

    # `reformatx` expects this format:
    # ./reformatx --floating {INPUT_FILE} -o {OUTPUT_FILE} {REFERENCE_SPECS} {TRANSFORMS} 
    # where:
    # - {INPUT_FILE} is the image to transform
    # - {OUTPUT_FILE} is where the output will be saved
    # - {REFERENCE_SPECS} defines the target space; this needs to be eitheran NRRD
    #   file from which CMTK can extract the target grid or the actual specs: 
    #   "--target-grid Nx,Ny,Nz:dX,dY,dZ:[Ox,Oy,Oz]" where N is the number of
    #   voxels in each dimension and d is the voxel size. The optional O is the
    #   origin of the image. If not provided, it is assumed to be (0, 0, 0).
    # - {TRANSFORMS} are the CMTK transform(s) to apply; prefix with "--inverse" to invert
    # Below command works to convert JFRC2 to FCWB:
    # /opt/local/lib/cmtk/bin/reformatx --verbose --floating JFRC2.nrrd -o JFRC2_xf.nrrd FCWB.nrrd --inverse /Users/philipps/flybrain-data/BridgingRegistrations/JFRC2_FCWB.list
    # This took XX minutes - should check if that is actually faster than the look-up approach we 
    # use in `images.py`

    target_specs = parse_target_specs(target)

    to_remove = []
    if isinstance(im, (str, pathlib.Path)):
        floating = pathlib.Path(im)
        if not im.is_file():
            raise ValueError(f"Image file not found: {im}")
    elif isinstance(im, np.ndarray):
        assert im.ndim == 3
        # Save to temporary file
        with tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False, delete_on_close=False) as tf:
            nrrd.write(tf.name, im)
            floating = tf.name
            to_remove.append(tf.name)
    else:
        raise ValueError(f"Invalid image type: {type(im)}")

    if out is None:
        outfile = tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False, delete_on_close=False).name
        to_remove.append(outfile)
    elif isinstance(out, (str, pathlib.Path)):
        outfile = pathlib.Path(out).resolve()
    else:
        raise ValueError(f"Invalid output type: {type(out)}")

    # Compile the command 
    args = [str(_cmtkbin / 'reformatx')]
    args += [f'-o {outfile}']
    args += [f'--floating {floating}']
    args += [f'--{interpolation}']
    args += [target_specs]

    # Add the regargs
    args += self.regargs      

    try:
        # run the binary
        # avoid resourcewarnings with null
        with open(os.devnull, "w") as devnull:
            startupinfo = None
            if platform.system() == "Windows":
                startupinfo = subprocess.STARTUPINFO()
                startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
            if verbose:
                # in debug mode print the output
                stdout = None
            else:
                stdout = devnull

            if verbose:
                config.logger.info("executing: {}".format(" ".join(args)))
            check_call(
                args,
                stdout=stdout,
                stderr=subprocess.STDOUT,
                startupinfo=startupinfo,
            )  

        if out is None:
            # Return transformed image
            return nrrd.read(outfile)
        elif verbose:
            config.logger.info(f"Transformed image saved to {outfile}")
    except BaseException:
        raise 
    finally:
        # Clean up temporary files
        for f in to_remove:
            os.remove(f) 

Elastix transforms of 3D spatial data.

Requires Elastix. Based on code by Jasper Phelps (https://github.com/jasper-tms/pytransformix).

Note that elastix transforms can not be inverted!

PARAMETER DESCRIPTION
file
            Filepath to elastix transformation file.

TYPE: str

copy_files
            Any files that need to be copied into the temporary
            directory where we perform the transform. These are
            typically files supplemental to the main transform
            file (e.g. defining an additional affine transform).

TYPE: filepath | list DEFAULT: []

Examples:

>>> from navis import transforms
>>> tr = transforms.ElastixTransform('/path/to/transform/transform')
>>> tr.xform(points)
Source code in navis/transforms/elastix.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
class ElastixTransform(BaseTransform):
    """Elastix transforms of 3D spatial data.

    Requires [Elastix](https://github.com/SuperElastix/elastix/). Based on
    code by Jasper Phelps (<https://github.com/jasper-tms/pytransformix>).

    Note that elastix transforms can not be inverted!

    Parameters
    ----------
    file :              str
                        Filepath to elastix transformation file.
    copy_files :        filepath | list, optional
                        Any files that need to be copied into the temporary
                        directory where we perform the transform. These are
                        typically files supplemental to the main transform
                        file (e.g. defining an additional affine transform).

    Examples
    --------
    >>> from navis import transforms
    >>> tr = transforms.ElastixTransform('/path/to/transform/transform')
    >>> tr.xform(points) # doctest: +SKIP

    """

    def __init__(self, file: str, copy_files=[]):
        self.file = pathlib.Path(file)
        self.copy_files = copy_files

    def __eq__(self, other: "ElastixTransform") -> bool:
        """Implement equality comparison."""
        if isinstance(other, ElastixTransform):
            if self.file == other.file:
                return True
        return False

    def check_if_possible(self, on_error: str = "raise"):
        """Check if this transform is possible."""
        if not _elastixbin:
            msg = (
                "Folder with elastix binaries not found. Make sure the "
                "directory is in your PATH environment variable."
            )
            if on_error == "raise":
                raise BaseException(msg)
            return msg
        if not self.file.is_file():
            msg = f"Transformation file {self.file} not found."
            if on_error == "raise":
                raise BaseException(msg)
                return msg

    def copy(self) -> "ElastixTransform":
        """Return copy."""
        # Attributes not to copy
        no_copy = []
        # Generate new empty transform
        x = self.__class__(self.file)
        # Override with this neuron's data
        x.__dict__.update(
            {k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy}
        )

        return x

    def write_input_file(self, points, filepath):
        """Write a numpy array in format required by transformix."""
        with open(filepath, "w") as f:
            f.write("point\n{}\n".format(len(points)))
            for x, y, z in points:
                f.write(f"{x:f} {y:f} {z:f}\n")

    def read_output_file(self, filepath) -> np.ndarray:
        """Load output file.

        Parameter
        ---------
        filepath :      str
                        Filepath to output file.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        The parse transformed points.

        """
        points = []
        with open(filepath, "r") as f:
            for line in f.readlines():
                output = line.split("OutputPoint = [ ")[1].split(" ]")[0]
                points.append([float(i) for i in output.split(" ")])
        return np.array(points)

    def xform(self, points: np.ndarray, return_logs=False) -> np.ndarray:
        """Xform data.

        Parameters
        ----------
        points :        (N, 3) numpy array | pandas.DataFrame
                        Points to xform. DataFrame must have x/y/z columns.
        return_logs :   bool
                        If True, will return logs instead of transformed points.
                        Really only useful for debugging.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        Transformed points.

        """
        self.check_if_possible(on_error="raise")

        if isinstance(points, pd.DataFrame):
            # Make sure x/y/z columns are present
            if np.any([c not in points for c in ["x", "y", "z"]]):
                raise ValueError("points DataFrame must have x/y/z columns.")
            points = points[["x", "y", "z"]].values
        elif not (
            isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3
        ):
            raise TypeError(
                "`points` must be numpy array of shape (N, 3) or "
                "pandas DataFrame with x/y/z columns"
            )

        # Everything happens in a temporary directory
        with tempfile.TemporaryDirectory() as tempdir:
            p = pathlib.Path(tempdir)

            # If required, copy additional files into the temporary directory
            if self.copy_files:
                for f in make_iterable(self.copy_files):
                    _ = pathlib.Path(shutil.copy(f, p))

            # Write points to file
            in_file = p / "inputpoints.txt"
            self.write_input_file(points, in_file)

            out_file = p / "outputpoints.txt"

            # Prepare the command
            command = [
                _elastixbin / "transformix",
                "-out",
                str(p),
                "-tp",
                str(self.file),
                "-def",
                str(in_file),
            ]

            # Keep track of current working directory
            cwd = os.getcwd()
            try:
                # Change working directory to the temporary directory
                # This is apparently required because elastix stupidly expects
                # any secondary transform files to be in the current directory
                # (as opposed to where the main transform is)
                os.chdir(p)
                # Run the actual transform
                proc = subprocess.run(command, stdout=subprocess.PIPE)
            except BaseException:
                raise
            finally:
                # This makes sure we land on our feet even in case of an error
                os.chdir(cwd)

            if return_logs:
                logfile = p / "transformix.log"
                if not logfile.is_file():
                    raise FileNotFoundError("No log file found.")
                with open(logfile) as f:
                    logs = f.read()
                return logs

            if not out_file.is_file():
                raise FileNotFoundError(
                    "Elastix transform did not produce any "
                    f"output:\n {proc.stdout.decode()}"
                )

            points_xf = self.read_output_file(out_file)

        return points_xf

Check if this transform is possible.

Source code in navis/transforms/elastix.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def check_if_possible(self, on_error: str = "raise"):
    """Check if this transform is possible."""
    if not _elastixbin:
        msg = (
            "Folder with elastix binaries not found. Make sure the "
            "directory is in your PATH environment variable."
        )
        if on_error == "raise":
            raise BaseException(msg)
        return msg
    if not self.file.is_file():
        msg = f"Transformation file {self.file} not found."
        if on_error == "raise":
            raise BaseException(msg)
            return msg

Return copy.

Source code in navis/transforms/elastix.py
181
182
183
184
185
186
187
188
189
190
191
192
def copy(self) -> "ElastixTransform":
    """Return copy."""
    # Attributes not to copy
    no_copy = []
    # Generate new empty transform
    x = self.__class__(self.file)
    # Override with this neuron's data
    x.__dict__.update(
        {k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy}
    )

    return x

Load output file.

Parameter

filepath : str Filepath to output file.

RETURNS DESCRIPTION
pointsxf

The parse transformed points.

TYPE: (N, 3) numpy array

Source code in navis/transforms/elastix.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def read_output_file(self, filepath) -> np.ndarray:
    """Load output file.

    Parameter
    ---------
    filepath :      str
                    Filepath to output file.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    The parse transformed points.

    """
    points = []
    with open(filepath, "r") as f:
        for line in f.readlines():
            output = line.split("OutputPoint = [ ")[1].split(" ]")[0]
            points.append([float(i) for i in output.split(" ")])
    return np.array(points)

Write a numpy array in format required by transformix.

Source code in navis/transforms/elastix.py
194
195
196
197
198
199
def write_input_file(self, points, filepath):
    """Write a numpy array in format required by transformix."""
    with open(filepath, "w") as f:
        f.write("point\n{}\n".format(len(points)))
        for x, y, z in points:
            f.write(f"{x:f} {y:f} {z:f}\n")

Xform data.

PARAMETER DESCRIPTION
points
        Points to xform. DataFrame must have x/y/z columns.

TYPE: (N, 3) numpy array | pandas.DataFrame

return_logs
        If True, will return logs instead of transformed points.
        Really only useful for debugging.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
pointsxf

Transformed points.

TYPE: (N, 3) numpy array

Source code in navis/transforms/elastix.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def xform(self, points: np.ndarray, return_logs=False) -> np.ndarray:
    """Xform data.

    Parameters
    ----------
    points :        (N, 3) numpy array | pandas.DataFrame
                    Points to xform. DataFrame must have x/y/z columns.
    return_logs :   bool
                    If True, will return logs instead of transformed points.
                    Really only useful for debugging.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    Transformed points.

    """
    self.check_if_possible(on_error="raise")

    if isinstance(points, pd.DataFrame):
        # Make sure x/y/z columns are present
        if np.any([c not in points for c in ["x", "y", "z"]]):
            raise ValueError("points DataFrame must have x/y/z columns.")
        points = points[["x", "y", "z"]].values
    elif not (
        isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3
    ):
        raise TypeError(
            "`points` must be numpy array of shape (N, 3) or "
            "pandas DataFrame with x/y/z columns"
        )

    # Everything happens in a temporary directory
    with tempfile.TemporaryDirectory() as tempdir:
        p = pathlib.Path(tempdir)

        # If required, copy additional files into the temporary directory
        if self.copy_files:
            for f in make_iterable(self.copy_files):
                _ = pathlib.Path(shutil.copy(f, p))

        # Write points to file
        in_file = p / "inputpoints.txt"
        self.write_input_file(points, in_file)

        out_file = p / "outputpoints.txt"

        # Prepare the command
        command = [
            _elastixbin / "transformix",
            "-out",
            str(p),
            "-tp",
            str(self.file),
            "-def",
            str(in_file),
        ]

        # Keep track of current working directory
        cwd = os.getcwd()
        try:
            # Change working directory to the temporary directory
            # This is apparently required because elastix stupidly expects
            # any secondary transform files to be in the current directory
            # (as opposed to where the main transform is)
            os.chdir(p)
            # Run the actual transform
            proc = subprocess.run(command, stdout=subprocess.PIPE)
        except BaseException:
            raise
        finally:
            # This makes sure we land on our feet even in case of an error
            os.chdir(cwd)

        if return_logs:
            logfile = p / "transformix.log"
            if not logfile.is_file():
                raise FileNotFoundError("No log file found.")
            with open(logfile) as f:
                logs = f.read()
            return logs

        if not out_file.is_file():
            raise FileNotFoundError(
                "Elastix transform did not produce any "
                f"output:\n {proc.stdout.decode()}"
            )

        points_xf = self.read_output_file(out_file)

    return points_xf

Apply custom function as transform.

PARAMETER DESCRIPTION
func
    Function that accepts and returns an (N, 3) array.

TYPE: callable

Source code in navis/transforms/base.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class FunctionTransform(BaseTransform):
    """Apply custom function as transform.

    Parameters
    ----------
    func :      callable
                Function that accepts and returns an (N, 3) array.

    """

    def __init__(self, func):
        """Initialize."""
        if not callable(func):
            raise TypeError('`func` must be callable')
        self.func = func

    def __eq__(self, other):
        """Check if the same."""
        if not isinstance(other, FunctionTransform):
            return False
        if self.func != other.func:
            return False
        return True

    def copy(self):
        """Return copy."""
        x = self.__class__(self.func)
        x.__dict__.update(self.__dict__)
        return x

    def xform(self, points: np.ndarray) -> np.ndarray:
        """Xform data.

        Parameters
        ----------
        points :        (N, 3) numpy array | pandas.DataFrame
                        Points to xform. DataFrame must have x/y/z columns.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        Transformed points.

        """
        if isinstance(points, pd.DataFrame):
            # Make sure x/y/z columns are present
            if np.any([c not in points for c in ['x', 'y', 'z']]):
                raise ValueError('points DataFrame must have x/y/z columns.')
            points = points[['x', 'y', 'z']].values
        elif not (isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3):
            raise TypeError('`points` must be numpy array of shape (N, 3) or '
                            'pandas DataFrame with x/y/z columns')
        return self.func(points.copy())

Initialize.

Source code in navis/transforms/base.py
117
118
119
120
121
def __init__(self, func):
    """Initialize."""
    if not callable(func):
        raise TypeError('`func` must be callable')
    self.func = func

Return copy.

Source code in navis/transforms/base.py
131
132
133
134
135
def copy(self):
    """Return copy."""
    x = self.__class__(self.func)
    x.__dict__.update(self.__dict__)
    return x

Xform data.

PARAMETER DESCRIPTION
points
        Points to xform. DataFrame must have x/y/z columns.

TYPE: (N, 3) numpy array | pandas.DataFrame

RETURNS DESCRIPTION
pointsxf

Transformed points.

TYPE: (N, 3) numpy array

Source code in navis/transforms/base.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def xform(self, points: np.ndarray) -> np.ndarray:
    """Xform data.

    Parameters
    ----------
    points :        (N, 3) numpy array | pandas.DataFrame
                    Points to xform. DataFrame must have x/y/z columns.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    Transformed points.

    """
    if isinstance(points, pd.DataFrame):
        # Make sure x/y/z columns are present
        if np.any([c not in points for c in ['x', 'y', 'z']]):
            raise ValueError('points DataFrame must have x/y/z columns.')
        points = points[['x', 'y', 'z']].values
    elif not (isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3):
        raise TypeError('`points` must be numpy array of shape (N, 3) or '
                        'pandas DataFrame with x/y/z columns')
    return self.func(points.copy())

Hdf5 transform of 3D spatial data.

See here for specifications of the format.

PARAMETER DESCRIPTION
f
        Path to Hdf5 transformation.

TYPE: str

direction
        Direction of transformation.

TYPE: "forward" | "inverse" DEFAULT: 'forward'

level
        For Hdf5 files with deformation fields at multiple
        resolutions: what level of detail to use. Negative values
        go backwards from the highest available resolution
        (-1 = highest, -2 = second highest, etc). Ignored if only a
        single deformation field present.

TYPE: int DEFAULT: -1

cache
        If True, we will cache the deformation field for subsequent
        future transforms. This will speed up future calculations
        in the future but comes at a memory cost.

TYPE: bool DEFAULT: False

full_ingest
        If True, will read and cache the full deformation field at
        initialization. This additional upfront cost can pay off if
        you are about to make many transforms across the volume.

TYPE: bool DEFAULT: False

Source code in navis/transforms/h5reg.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
class H5transform(BaseTransform):
    """Hdf5 transform of 3D spatial data.

    See [here](https://github.com/saalfeldlab/template-building/wiki/Hdf5-Deformation-fields)
    for specifications of the format.

    Parameters
    ----------
    f :             str
                    Path to Hdf5 transformation.
    direction :     "forward" | "inverse"
                    Direction of transformation.
    level :         int, optional
                    For Hdf5 files with deformation fields at multiple
                    resolutions: what level of detail to use. Negative values
                    go backwards from the highest available resolution
                    (-1 = highest, -2 = second highest, etc). Ignored if only a
                    single deformation field present.
    cache :         bool
                    If True, we will cache the deformation field for subsequent
                    future transforms. This will speed up future calculations
                    in the future but comes at a memory cost.
    full_ingest :   bool
                    If True, will read and cache the full deformation field at
                    initialization. This additional upfront cost can pay off if
                    you are about to make many transforms across the volume.

    """

    def __init__(self, f: str,
                 direction: str = 'forward',
                 level: Optional[int] = -1,
                 cache: bool = False,
                 full_ingest: bool = False):
        """Init class."""
        assert direction in ('forward', 'inverse'), ('`direction` must be "forward"'
                                                     f'or "inverse", not "{direction}"')

        self.file = f
        self.direction = direction
        self.field = {'forward': 'dfield', 'inverse': 'invdfield'}[direction]

        # Trying to avoid the file repeatedly so we are making these initial
        # adjustments all in one go even though it would be more Pythonic to
        # delegate to property getter/setter methods
        with h5py.File(self.file, 'r') as h5:
            # Get the available levels
            available_levels = []
            for k in h5.keys():
                try:
                    available_levels.append(int(k))
                except ValueError:
                    continue
            available_levels = sorted(available_levels)

            # Check if there are indeed deformation fields at various resolutions
            if available_levels:
                if isinstance(level, type(None)):
                    level = available_levels[0]
                elif level < 0:
                    ix = level * -1 - 1
                    level = available_levels[ix]

                # Set level
                self._level = str(level)

                # Shape of deformation field
                self.shape = h5[self.level][self.field].shape

                # Data type of deformation field
                self.dtype = h5[self.level][self.field].dtype
            elif self.field in h5.keys():
                # Set level
                self._level = None

                # Shape of deformation field
                self.shape = h5[self.field].shape

                # Data type of deformation field
                self.dtype = h5[self.field].dtype
            else:
                raise ValueError('Unable to parse deformation fields from '
                                 f' {self.file}.')

        # Prepare cache if applicable
        if full_ingest:
            # Ingest the whole deformation field
            self.full_ingest()
        elif cache:
            self.use_cache = True

    def __eq__(self, other) -> bool:
        """Compare with other Transform."""
        if isinstance(other, H5transform):
            if self.file == other.file:
                if self.direction == other.direction:
                    if self.level == other.level:
                        return True
        return False

    def __neg__(self) -> 'H5transform':
        """Invert direction."""
        # Swap direction
        new_direction = {'forward': 'inverse',
                         'inverse': 'forward'}[self.direction]
        # We will re-iniatialize
        x = H5transform(self.file,
                        direction=new_direction,
                        level=int(self.level) if self.level else None,
                        cache=self.use_cache,
                        full_ingest=False)

        return x

    @property
    def level(self):
        return self._level

    @level.setter
    def level(self, value):
        raise ValueError('`level` cannot be changed after initialization.')

    @property
    def use_cache(self):
        """Whether to cache the deformation field."""
        if not hasattr(self, '_use_cache'):
            self._use_cache = False
        return self._use_cache

    @use_cache.setter
    def use_cache(self, value):
        """Set whether to cache the deformation field."""
        assert isinstance(value, bool)

        # If was False and now set to True, build the cache
        if not getattr(self, '_use_cache', False) and value:
            # This is the actual cache
            self.cache = np.zeros(self.shape,
                                  dtype=self.dtype)
            # This is a mask that tells us which values have already been cached
            self.cached = np.zeros(self.shape[:-1],
                                   dtype=bool)
            self._use_cache = True
        # If was True and now is set to False, deconstruct cache
        elif getattr(self, '_use_cache', False) and not value:
            del self.cache
            del self.cached
            self._use_cache = False

            if hasattr(self, '_fully_ingested'):
                del self._fully_ingested

        # Note: should explore whether we can use sparse N-dimensional
        # arrays for caching to save memory
        # See https://github.com/pydata/sparse/

    def copy(self):
        """Return copy."""
        return H5transform(self.file,
                           direction=self.direction,
                           level=int(self.level) if self.level else None,
                           cache=self.use_cache,
                           full_ingest=False)

    def full_ingest(self):
        """Fully ingest the deformation field."""
        # Skip if already ingested
        if getattr(self, '_fully_ingested', False):
            return

        with h5py.File(self.file, 'r') as h5:
            # Read in the entire field
            if self.level:
                self.cache = h5[self.level][self.field][:, :, :]
            else:
                self.cache = h5[self.field][:, :, :]
            # Keep a flag of this
            self._fully_ingested = True
            # We set `cached` to True instead of using a mask
            self.cached = True
            # Keep track of the caching
            self._use_cache = True

    def precache(self, bbox: Union[list, np.ndarray], padding=True):
        """Cache deformation field for given bounding box.

        Parameters
        ----------
        bbox :      list | array
                    Must be `[[x1, x2], [y1, y2], [z1, z2]]`.
        padding :   bool
                    If True, will add the (required!) padding to the bounding
                    box.

        """
        bbox = np.asarray(bbox)

        if bbox.ndim != 2 or bbox.shape != (3, 2):
            raise ValueError(f'Expected (3, 2) bounding box, got {bbox.shape}')

        # Set use_cache=True -> this also prepares the cache array(s)
        self.use_cache = True

        with h5py.File(self.file, 'r') as h5:
            if self.level:
                spacing = h5[self.level][self.field].attrs['spacing']
            else:
                spacing = h5[self.field].attrs['spacing']

            # Note that we invert because spacing is given in (z, y, x)
            bbox_vxl = (bbox.T / spacing[::-1]).T
            # Digitize into voxels
            bbox_vxl = bbox_vxl.round().astype(int)

            if padding:
                bbox_vxl[:, 0] -= 2
                bbox_vxl[:, 1] += 2

            # Make sure we are within bounds
            bbox_vxl = np.clip(bbox_vxl.T, 0, self.shape[:-1][::-1]).T

            # Extract values
            x1, x2, y1, y2, z1, z2 = bbox_vxl.flatten()

            # Cache values in this bounding box
            if self.level:
                self.cache[z1:z2, y1:y2, x1:x2] = h5[self.level][self.field][z1:z2, y1:y2, x1:x2]
            else:
                self.cache[z1:z2, y1:y2, x1:x2] = h5[self.field][z1:z2, y1:y2, x1:x2]
            self.cached[z1:z2, y1:y2, x1:x2] = True

    @staticmethod
    def from_file(filepath: str, **kwargs) -> 'H5transform':
        """Generate H5transform from file.

        Parameters
        ----------
        filepath :  str
                    Path to H5 transform.
        **kwargs
                    Keyword arguments passed to H5transform.__init__

        Returns
        -------
        H5transform

        """
        return H5transform(str(filepath), **kwargs)

    def xform(self,
              points: np.ndarray,
              affine_fallback: bool = True,
              force_deform: bool = True) -> np.ndarray:
        """Xform data.

        Parameters
        ----------
        points :            (N, 3) numpy array | pandas.DataFrame
                            Points to xform. DataFrame must have x/y/z columns.
        affine_fallback :   bool
                            If False, points outside the deformation field will
                            be returned as `np.nan`. If True, these points will
                            only receive the affine part of the transformation.
        force_deform :      bools
                            If True, points outside the deformation field be
                            deformed using the closest point inside the
                            deformation field. Ignored if `affine_fallback` is
                            `False`.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        Transformed points. Points outside the deformation field
                        will have only the affine part of the transform applied.

        """
        if isinstance(points, pd.DataFrame):
            # Make sure x/y/z columns are present
            if np.any([c not in points for c in ['x', 'y', 'z']]):
                raise ValueError('points DataFrame must have x/y/z columns.')
            points = points[['x', 'y', 'z']].values

        if not isinstance(points, np.ndarray) or points.ndim != 2 or points.shape[1] != 3:
            raise TypeError('`points` must be numpy array of shape (N, 3) or '
                            'pandas DataFrame with x/y/z columns')

        # Read the file
        with h5py.File(self.file, 'r') as h5:
            if self.level:
                field = h5[self.level][self.field]
            else:
                field = h5[self.field]

            # We need the field to be (z, y, x, offsets) with `offsets` being
            # three values - for example: (293, 470, 1010, 3)
            # If that's not the case, something is fishy!
            if field.shape[-1] != 3:
                logger.warning('Expected the deformation field to be of shape '
                               f'(z, y, x, 3), got {field.shape}.')

            if 'affine' in field.attrs:
                # The affine part of the transform is a 4 x 4 matrix where the upper
                # 3 x 4 part (row x columns) is an attribute of the h5 dataset
                M = np.ones((4, 4))
                M[:3, :4] = field.attrs['affine'].reshape(3, 4)
                affine = AffineTransform(M)
            else:
                affine = False

            # Get quantization multiplier for later use
            quantization_multiplier = field.attrs.get('quantization_multiplier', 1)

            # For forward direction, the affine part is applied first
            if self.direction == 'inverse' and affine:
                xf = affine.xform(points)
            else:
                xf = points

            # Translate points into voxel space
            spacing = field.attrs['spacing']
            # Note that we invert because spacing is given in (z, y, x)
            xf_voxel = xf / spacing[::-1]
            # Digitize points into voxels
            xf_indices = xf_voxel.round().astype(int)
            # Determine the bounding box of the deformation vectors we need
            # Note that we are grabbing a bit more than required - this is
            # necessary for interpolation later down the line
            mn = xf_indices.min(axis=0) - 2
            mx = xf_indices.max(axis=0) + 2

            # Make sure we are within bounds
            # Note that we clip `mn` at 0 and `mx` at 2 at the lower end?
            # This is to make sure we have enough of the deformation field
            # to interpolate later on `offsets`
            mn = np.clip(mn, 2, np.array(self.shape[:-1][::-1])) - 2
            mx = np.clip(mx, 0, np.array(self.shape[:-1][::-1]) - 2) + 2

            # Check if we can use cached values
            if self.use_cache and (hasattr(self, '_fully_ingested')
                                   or np.all(self.cached[mn[2]: mx[2],
                                                         mn[1]: mx[1],
                                                         mn[0]: mx[0]])):
                offsets = self.cache[mn[2]: mx[2], mn[1]: mx[1], mn[0]: mx[0]]
            else:
                # Load the deformation values for this bounding box
                # This is faster than grabbing individual voxels and
                offsets = field[mn[2]: mx[2], mn[1]: mx[1], mn[0]: mx[0]]

                if self.use_cache:
                    # Write these offsets to cache
                    self.cache[mn[2]: mx[2], mn[1]: mx[1], mn[0]: mx[0]] = offsets
                    self.cached[mn[2]: mx[2], mn[1]: mx[1], mn[0]: mx[0]] = True

        # For interpolation, we need to split the offsets into their x, y
        # and z component
        xgrid = offsets[:, :, :, 0]
        ygrid = offsets[:, :, :, 1]
        zgrid = offsets[:, :, :, 2]

        xx = np.arange(mn[0], mx[0])
        yy = np.arange(mn[1], mx[1])
        zz = np.arange(mn[2], mx[2])

        # The RegularGridInterpolator is the fastest one but the results are
        # are ever so slightly (4th decimal) different from the Java impl
        xinterp = RegularGridInterpolator((zz, yy, xx), xgrid,
                                          bounds_error=False, fill_value=0)
        yinterp = RegularGridInterpolator((zz, yy, xx), ygrid,
                                          bounds_error=False, fill_value=0)
        zinterp = RegularGridInterpolator((zz, yy, xx), zgrid,
                                          bounds_error=False, fill_value=0)

        # Before we interpolate check how many points are outside the
        # deformation field -> these will only receive the affine part of the
        # transform
        is_out = (xf_voxel.min(axis=1) < 0) | np.any(xf_voxel >= self.shape[:-1][::-1], axis=1)

        # If more than 20% (arbitrary number) of voxels are out, there is
        # something suspicious going on
        frac_out = is_out.sum() / xf_voxel.shape[0]
        if frac_out > 0.2:
            logger.warning(f'A suspiciously large fraction ({frac_out:.1%}) '
                           f'of {xf_voxel.shape[0]} points appear to be outside '
                           'the H5 deformation field. Please make doubly sure '
                           'that the input coordinates are in the correct '
                           'space/units')

        # If all points are outside the volume, the interpolation complains
        if frac_out < 1 or (force_deform and affine_fallback):
            if force_deform:
                # For the purpose of finding offsets, we will snap points
                # outside the deformation field to the closest inside voxel
                q_voxel = np.clip(xf_voxel,
                                  a_min=0,
                                  a_max=np.array(self.shape[:-1][::-1]) - 1)
            else:
                q_voxel = xf_voxel

            # Interpolate coordinates and re-combine to an x/y/z array
            offset_vxl = np.vstack((xinterp(q_voxel[:, ::-1], method='linear'),
                                    yinterp(q_voxel[:, ::-1], method='linear'),
                                    zinterp(q_voxel[:, ::-1], method='linear'))).T

            # Turn offsets into real-world coordinates
            offset_real = offset_vxl * quantization_multiplier

            # Apply offsets
            # Please note that we must not use += here
            # That's to avoid running into data type errors where numpy
            # will refuse to add e.g. float64 to int64.
            # By using "+" instead of "+=" we are creating a new array that
            # is potentially upcast from e.g. int64 to float64
            xf = xf + offset_real

        # For inverse direction, the affine part is applied second
        if self.direction == 'forward' and affine:
            xf = affine.xform(xf)

        # If no affine_fallback, set outside points to np.nan
        if not affine_fallback:
            xf[is_out, :] = np.nan

        return xf

Whether to cache the deformation field.

Init class.

Source code in navis/transforms/h5reg.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(self, f: str,
             direction: str = 'forward',
             level: Optional[int] = -1,
             cache: bool = False,
             full_ingest: bool = False):
    """Init class."""
    assert direction in ('forward', 'inverse'), ('`direction` must be "forward"'
                                                 f'or "inverse", not "{direction}"')

    self.file = f
    self.direction = direction
    self.field = {'forward': 'dfield', 'inverse': 'invdfield'}[direction]

    # Trying to avoid the file repeatedly so we are making these initial
    # adjustments all in one go even though it would be more Pythonic to
    # delegate to property getter/setter methods
    with h5py.File(self.file, 'r') as h5:
        # Get the available levels
        available_levels = []
        for k in h5.keys():
            try:
                available_levels.append(int(k))
            except ValueError:
                continue
        available_levels = sorted(available_levels)

        # Check if there are indeed deformation fields at various resolutions
        if available_levels:
            if isinstance(level, type(None)):
                level = available_levels[0]
            elif level < 0:
                ix = level * -1 - 1
                level = available_levels[ix]

            # Set level
            self._level = str(level)

            # Shape of deformation field
            self.shape = h5[self.level][self.field].shape

            # Data type of deformation field
            self.dtype = h5[self.level][self.field].dtype
        elif self.field in h5.keys():
            # Set level
            self._level = None

            # Shape of deformation field
            self.shape = h5[self.field].shape

            # Data type of deformation field
            self.dtype = h5[self.field].dtype
        else:
            raise ValueError('Unable to parse deformation fields from '
                             f' {self.file}.')

    # Prepare cache if applicable
    if full_ingest:
        # Ingest the whole deformation field
        self.full_ingest()
    elif cache:
        self.use_cache = True

Return copy.

Source code in navis/transforms/h5reg.py
189
190
191
192
193
194
195
def copy(self):
    """Return copy."""
    return H5transform(self.file,
                       direction=self.direction,
                       level=int(self.level) if self.level else None,
                       cache=self.use_cache,
                       full_ingest=False)

Generate H5transform from file.

PARAMETER DESCRIPTION
filepath
    Path to H5 transform.

TYPE: str

**kwargs
    Keyword arguments passed to H5transform.__init__

DEFAULT: {}

RETURNS DESCRIPTION
H5transform
Source code in navis/transforms/h5reg.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
@staticmethod
def from_file(filepath: str, **kwargs) -> 'H5transform':
    """Generate H5transform from file.

    Parameters
    ----------
    filepath :  str
                Path to H5 transform.
    **kwargs
                Keyword arguments passed to H5transform.__init__

    Returns
    -------
    H5transform

    """
    return H5transform(str(filepath), **kwargs)

Fully ingest the deformation field.

Source code in navis/transforms/h5reg.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def full_ingest(self):
    """Fully ingest the deformation field."""
    # Skip if already ingested
    if getattr(self, '_fully_ingested', False):
        return

    with h5py.File(self.file, 'r') as h5:
        # Read in the entire field
        if self.level:
            self.cache = h5[self.level][self.field][:, :, :]
        else:
            self.cache = h5[self.field][:, :, :]
        # Keep a flag of this
        self._fully_ingested = True
        # We set `cached` to True instead of using a mask
        self.cached = True
        # Keep track of the caching
        self._use_cache = True

Cache deformation field for given bounding box.

PARAMETER DESCRIPTION
bbox
    Must be `[[x1, x2], [y1, y2], [z1, z2]]`.

TYPE: list | array

padding
    If True, will add the (required!) padding to the bounding
    box.

TYPE: bool DEFAULT: True

Source code in navis/transforms/h5reg.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def precache(self, bbox: Union[list, np.ndarray], padding=True):
    """Cache deformation field for given bounding box.

    Parameters
    ----------
    bbox :      list | array
                Must be `[[x1, x2], [y1, y2], [z1, z2]]`.
    padding :   bool
                If True, will add the (required!) padding to the bounding
                box.

    """
    bbox = np.asarray(bbox)

    if bbox.ndim != 2 or bbox.shape != (3, 2):
        raise ValueError(f'Expected (3, 2) bounding box, got {bbox.shape}')

    # Set use_cache=True -> this also prepares the cache array(s)
    self.use_cache = True

    with h5py.File(self.file, 'r') as h5:
        if self.level:
            spacing = h5[self.level][self.field].attrs['spacing']
        else:
            spacing = h5[self.field].attrs['spacing']

        # Note that we invert because spacing is given in (z, y, x)
        bbox_vxl = (bbox.T / spacing[::-1]).T
        # Digitize into voxels
        bbox_vxl = bbox_vxl.round().astype(int)

        if padding:
            bbox_vxl[:, 0] -= 2
            bbox_vxl[:, 1] += 2

        # Make sure we are within bounds
        bbox_vxl = np.clip(bbox_vxl.T, 0, self.shape[:-1][::-1]).T

        # Extract values
        x1, x2, y1, y2, z1, z2 = bbox_vxl.flatten()

        # Cache values in this bounding box
        if self.level:
            self.cache[z1:z2, y1:y2, x1:x2] = h5[self.level][self.field][z1:z2, y1:y2, x1:x2]
        else:
            self.cache[z1:z2, y1:y2, x1:x2] = h5[self.field][z1:z2, y1:y2, x1:x2]
        self.cached[z1:z2, y1:y2, x1:x2] = True

Xform data.

PARAMETER DESCRIPTION
points
            Points to xform. DataFrame must have x/y/z columns.

TYPE: (N, 3) numpy array | pandas.DataFrame

affine_fallback
            If False, points outside the deformation field will
            be returned as `np.nan`. If True, these points will
            only receive the affine part of the transformation.

TYPE: bool DEFAULT: True

force_deform
            If True, points outside the deformation field be
            deformed using the closest point inside the
            deformation field. Ignored if `affine_fallback` is
            `False`.

TYPE: bools DEFAULT: True

RETURNS DESCRIPTION
pointsxf

Transformed points. Points outside the deformation field will have only the affine part of the transform applied.

TYPE: (N, 3) numpy array

Source code in navis/transforms/h5reg.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
def xform(self,
          points: np.ndarray,
          affine_fallback: bool = True,
          force_deform: bool = True) -> np.ndarray:
    """Xform data.

    Parameters
    ----------
    points :            (N, 3) numpy array | pandas.DataFrame
                        Points to xform. DataFrame must have x/y/z columns.
    affine_fallback :   bool
                        If False, points outside the deformation field will
                        be returned as `np.nan`. If True, these points will
                        only receive the affine part of the transformation.
    force_deform :      bools
                        If True, points outside the deformation field be
                        deformed using the closest point inside the
                        deformation field. Ignored if `affine_fallback` is
                        `False`.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    Transformed points. Points outside the deformation field
                    will have only the affine part of the transform applied.

    """
    if isinstance(points, pd.DataFrame):
        # Make sure x/y/z columns are present
        if np.any([c not in points for c in ['x', 'y', 'z']]):
            raise ValueError('points DataFrame must have x/y/z columns.')
        points = points[['x', 'y', 'z']].values

    if not isinstance(points, np.ndarray) or points.ndim != 2 or points.shape[1] != 3:
        raise TypeError('`points` must be numpy array of shape (N, 3) or '
                        'pandas DataFrame with x/y/z columns')

    # Read the file
    with h5py.File(self.file, 'r') as h5:
        if self.level:
            field = h5[self.level][self.field]
        else:
            field = h5[self.field]

        # We need the field to be (z, y, x, offsets) with `offsets` being
        # three values - for example: (293, 470, 1010, 3)
        # If that's not the case, something is fishy!
        if field.shape[-1] != 3:
            logger.warning('Expected the deformation field to be of shape '
                           f'(z, y, x, 3), got {field.shape}.')

        if 'affine' in field.attrs:
            # The affine part of the transform is a 4 x 4 matrix where the upper
            # 3 x 4 part (row x columns) is an attribute of the h5 dataset
            M = np.ones((4, 4))
            M[:3, :4] = field.attrs['affine'].reshape(3, 4)
            affine = AffineTransform(M)
        else:
            affine = False

        # Get quantization multiplier for later use
        quantization_multiplier = field.attrs.get('quantization_multiplier', 1)

        # For forward direction, the affine part is applied first
        if self.direction == 'inverse' and affine:
            xf = affine.xform(points)
        else:
            xf = points

        # Translate points into voxel space
        spacing = field.attrs['spacing']
        # Note that we invert because spacing is given in (z, y, x)
        xf_voxel = xf / spacing[::-1]
        # Digitize points into voxels
        xf_indices = xf_voxel.round().astype(int)
        # Determine the bounding box of the deformation vectors we need
        # Note that we are grabbing a bit more than required - this is
        # necessary for interpolation later down the line
        mn = xf_indices.min(axis=0) - 2
        mx = xf_indices.max(axis=0) + 2

        # Make sure we are within bounds
        # Note that we clip `mn` at 0 and `mx` at 2 at the lower end?
        # This is to make sure we have enough of the deformation field
        # to interpolate later on `offsets`
        mn = np.clip(mn, 2, np.array(self.shape[:-1][::-1])) - 2
        mx = np.clip(mx, 0, np.array(self.shape[:-1][::-1]) - 2) + 2

        # Check if we can use cached values
        if self.use_cache and (hasattr(self, '_fully_ingested')
                               or np.all(self.cached[mn[2]: mx[2],
                                                     mn[1]: mx[1],
                                                     mn[0]: mx[0]])):
            offsets = self.cache[mn[2]: mx[2], mn[1]: mx[1], mn[0]: mx[0]]
        else:
            # Load the deformation values for this bounding box
            # This is faster than grabbing individual voxels and
            offsets = field[mn[2]: mx[2], mn[1]: mx[1], mn[0]: mx[0]]

            if self.use_cache:
                # Write these offsets to cache
                self.cache[mn[2]: mx[2], mn[1]: mx[1], mn[0]: mx[0]] = offsets
                self.cached[mn[2]: mx[2], mn[1]: mx[1], mn[0]: mx[0]] = True

    # For interpolation, we need to split the offsets into their x, y
    # and z component
    xgrid = offsets[:, :, :, 0]
    ygrid = offsets[:, :, :, 1]
    zgrid = offsets[:, :, :, 2]

    xx = np.arange(mn[0], mx[0])
    yy = np.arange(mn[1], mx[1])
    zz = np.arange(mn[2], mx[2])

    # The RegularGridInterpolator is the fastest one but the results are
    # are ever so slightly (4th decimal) different from the Java impl
    xinterp = RegularGridInterpolator((zz, yy, xx), xgrid,
                                      bounds_error=False, fill_value=0)
    yinterp = RegularGridInterpolator((zz, yy, xx), ygrid,
                                      bounds_error=False, fill_value=0)
    zinterp = RegularGridInterpolator((zz, yy, xx), zgrid,
                                      bounds_error=False, fill_value=0)

    # Before we interpolate check how many points are outside the
    # deformation field -> these will only receive the affine part of the
    # transform
    is_out = (xf_voxel.min(axis=1) < 0) | np.any(xf_voxel >= self.shape[:-1][::-1], axis=1)

    # If more than 20% (arbitrary number) of voxels are out, there is
    # something suspicious going on
    frac_out = is_out.sum() / xf_voxel.shape[0]
    if frac_out > 0.2:
        logger.warning(f'A suspiciously large fraction ({frac_out:.1%}) '
                       f'of {xf_voxel.shape[0]} points appear to be outside '
                       'the H5 deformation field. Please make doubly sure '
                       'that the input coordinates are in the correct '
                       'space/units')

    # If all points are outside the volume, the interpolation complains
    if frac_out < 1 or (force_deform and affine_fallback):
        if force_deform:
            # For the purpose of finding offsets, we will snap points
            # outside the deformation field to the closest inside voxel
            q_voxel = np.clip(xf_voxel,
                              a_min=0,
                              a_max=np.array(self.shape[:-1][::-1]) - 1)
        else:
            q_voxel = xf_voxel

        # Interpolate coordinates and re-combine to an x/y/z array
        offset_vxl = np.vstack((xinterp(q_voxel[:, ::-1], method='linear'),
                                yinterp(q_voxel[:, ::-1], method='linear'),
                                zinterp(q_voxel[:, ::-1], method='linear'))).T

        # Turn offsets into real-world coordinates
        offset_real = offset_vxl * quantization_multiplier

        # Apply offsets
        # Please note that we must not use += here
        # That's to avoid running into data type errors where numpy
        # will refuse to add e.g. float64 to int64.
        # By using "+" instead of "+=" we are creating a new array that
        # is potentially upcast from e.g. int64 to float64
        xf = xf + offset_real

    # For inverse direction, the affine part is applied second
    if self.direction == 'forward' and affine:
        xf = affine.xform(xf)

    # If no affine_fallback, set outside points to np.nan
    if not affine_fallback:
        xf[is_out, :] = np.nan

    return xf

Thin Plate Spline transforms of 3D spatial data.

PARAMETER DESCRIPTION
landmarks_source
            Source landmarks as x/y/z coordinates.

TYPE: (M, 3) numpy array

landmarks_target
            Target landmarks as x/y/z coordinates.

TYPE: (M, 3) numpy array

Examples:

>>> from navis import transforms
>>> import numpy as np
>>> # Generate some mock landmarks
>>> src = np.array([[0, 0, 0], [10, 10, 10], [100, 100, 100], [80, 10, 30]])
>>> trg = np.array([[1, 15, 5], [9, 18, 21], [80, 99, 120], [5, 10, 80]])
>>> tr = transforms.thinplate.TPStransform(src, trg)
>>> points = np.array([[0, 0, 0], [50, 50, 50]])
>>> tr.xform(points)
array([[ 1.        , 15.        ,  5.        ],
       [40.55555556, 54.        , 65.        ]])
Source code in navis/transforms/thinplate.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class TPStransform(BaseTransform):
    """Thin Plate Spline transforms of 3D spatial data.

    Parameters
    ----------
    landmarks_source :  (M, 3) numpy array
                        Source landmarks as x/y/z coordinates.
    landmarks_target :  (M, 3) numpy array
                        Target landmarks as x/y/z coordinates.

    Examples
    --------
    >>> from navis import transforms
    >>> import numpy as np
    >>> # Generate some mock landmarks
    >>> src = np.array([[0, 0, 0], [10, 10, 10], [100, 100, 100], [80, 10, 30]])
    >>> trg = np.array([[1, 15, 5], [9, 18, 21], [80, 99, 120], [5, 10, 80]])
    >>> tr = transforms.thinplate.TPStransform(src, trg)
    >>> points = np.array([[0, 0, 0], [50, 50, 50]])
    >>> tr.xform(points)
    array([[ 1.        , 15.        ,  5.        ],
           [40.55555556, 54.        , 65.        ]])

    """

    def __init__(self, landmarks_source: np.ndarray,
                 landmarks_target: np.ndarray):
        """Initialize class."""
        # Some checks
        self.source = np.asarray(landmarks_source)
        self.target = np.asarray(landmarks_target)

        if self.source.shape[1] != 3:
            raise ValueError(f'Expected (N, 3) array, got {self.source.shape}')
        if self.target.shape[1] != 3:
            raise ValueError(f'Expected (N, 3) array, got {self.target.shape}')

        if self.source.shape[0] != self.target.shape[0]:
            raise ValueError('Number of source landmarks must match number of '
                             'target landmarks.')

        self._W, self._A = None, None

    def __eq__(self, other) -> bool:
        """Implement equality comparison."""
        if isinstance(other, TPStransform):
            if self.source.shape[0] == other.source.shape[0]:
                if np.all(self.source == other.source):
                    if np.all(self.target == other.target):
                        return True
        return False

    def __neg__(self) -> 'TPStransform':
        """Invert direction."""
        # Switch source and target
        return TPStransform(self.target, self.source)

    def _calc_tps_coefs(self):
        # Calculate thinplate coefficients
        self._W, self._A = mops.tps_coefs(self.source, self.target)

    @property
    def W(self):
        if isinstance(self._W, type(None)):
            # Calculate coefficients
            self._calc_tps_coefs()
        return self._W

    @property
    def A(self):
        if isinstance(self._A, type(None)):
            # Calculate coefficients
            self._calc_tps_coefs()
        return self._A

    def copy(self):
        """Make copy."""
        x = TPStransform(self.source, self.target)

        x.__dict__.update(self.__dict__)

        return x

    def xform(self, points: np.ndarray) -> np.ndarray:
        """Transform points.

        Parameters
        ----------
        points :    (N, 3) array
                    Points to transform.

        Returns
        -------
        pointsxf :  (N, 3) array
                    Transformed points.

        """
        if isinstance(points, pd.DataFrame):
            if any([c not in points for c in ['x', 'y', 'z']]):
                raise ValueError('DataFrame must have x/y/z columns.')
            points = points[['x', 'y', 'z']].values

        U = mops.K_matrix(points, self.source)
        P = mops.P_matrix(points)
        # The warped pts are the affine part + the non-uniform part
        return np.matmul(P, self.A) + np.matmul(U, self.W)

Initialize class.

Source code in navis/transforms/thinplate.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __init__(self, landmarks_source: np.ndarray,
             landmarks_target: np.ndarray):
    """Initialize class."""
    # Some checks
    self.source = np.asarray(landmarks_source)
    self.target = np.asarray(landmarks_target)

    if self.source.shape[1] != 3:
        raise ValueError(f'Expected (N, 3) array, got {self.source.shape}')
    if self.target.shape[1] != 3:
        raise ValueError(f'Expected (N, 3) array, got {self.target.shape}')

    if self.source.shape[0] != self.target.shape[0]:
        raise ValueError('Number of source landmarks must match number of '
                         'target landmarks.')

    self._W, self._A = None, None

Make copy.

Source code in navis/transforms/thinplate.py
113
114
115
116
117
118
119
def copy(self):
    """Make copy."""
    x = TPStransform(self.source, self.target)

    x.__dict__.update(self.__dict__)

    return x

Transform points.

PARAMETER DESCRIPTION
points
    Points to transform.

TYPE: (N, 3) array

RETURNS DESCRIPTION
pointsxf

Transformed points.

TYPE: (N, 3) array

Source code in navis/transforms/thinplate.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def xform(self, points: np.ndarray) -> np.ndarray:
    """Transform points.

    Parameters
    ----------
    points :    (N, 3) array
                Points to transform.

    Returns
    -------
    pointsxf :  (N, 3) array
                Transformed points.

    """
    if isinstance(points, pd.DataFrame):
        if any([c not in points for c in ['x', 'y', 'z']]):
            raise ValueError('DataFrame must have x/y/z columns.')
        points = points[['x', 'y', 'z']].values

    U = mops.K_matrix(points, self.source)
    P = mops.P_matrix(points)
    # The warped pts are the affine part + the non-uniform part
    return np.matmul(P, self.A) + np.matmul(U, self.W)