Skip to content

transforms

Affine transformation of 3D spatial data.

PARAMETER DESCRIPTION
matrix
        Affine matrix.

TYPE: (4, 4) np.ndarray

Examples:

A simple scaling transform

>>> from navis import transforms
>>> import numpy as np
>>> M = np.diag([1e3, 1e3, 1e3, 1])
>>> tr = transforms.affine.AffineTransform(M)
>>> points = np.array([[0, 0, 0], [1, 1, 1]])
>>> tr.xform(points)
array([[   0.,    0.,    0.],
       [1000., 1000., 1000.]])
Source code in navis/transforms/affine.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class AffineTransform(BaseTransform):
    """Affine transformation of 3D spatial data.

    Parameters
    ----------
    matrix :        (4, 4) np.ndarray
                    Affine matrix.

    Examples
    --------
    A simple scaling transform

    >>> from navis import transforms
    >>> import numpy as np
    >>> M = np.diag([1e3, 1e3, 1e3, 1])
    >>> tr = transforms.affine.AffineTransform(M)
    >>> points = np.array([[0, 0, 0], [1, 1, 1]])
    >>> tr.xform(points)
    array([[   0.,    0.,    0.],
           [1000., 1000., 1000.]])

    """

    def __init__(self, matrix: np.ndarray, direction: str = 'forward'):
        """Initialize transform."""
        assert direction in ('forward', 'inverse')

        self.matrix = matrix

        if direction == 'inverse':
            self.matrix = np.linalg.inv(self.matrix)

    def __eq__(self, other: 'AffineTransform') -> bool:
        """Implements equality comparison."""
        if isinstance(other, AffineTransform):
            if np.all(self.matrix == other.matrix):
                return True
        return False

    def __neg__(self):
        """Invert direction."""
        x = self.copy()

        # Invert affine matrix
        x.matrix = np.linalg.inv(x.matrix)

        return x

    def __add__(self, other: 'AffineTransform') -> 'AffineTransform':
        """Add two affine transforms."""
        if isinstance(other, AffineTransform):
            x = self.copy()
            x.matrix = np.dot(self.matrix, other.matrix)
            return x
        raise ValueError('Can only add `AffineTransform` objects.')

    def copy(self) -> 'AffineTransform':
        """Return copy of transform."""
        # Attributes not to copy
        no_copy = []
        # Generate new empty transform
        x = self.__class__(None)
        # Override with this neuron's data
        x.__dict__.update({k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy})

        return x

    def xform(self, points: np.ndarray, invert: bool = False) -> np.ndarray:
        """Apply transform to points.

        Parameters
        ----------
        points :        np.ndarray
                        (N, 3) array of x/y/z locations.
        invert :        bool
                        If True, will invert the transform.

        Returns
        -------
        pointsxf :      np.ndarray
                        The transformed points.

        """
        points = np.asarray(points)

        if points.ndim != 2 and points.shape[1] != 3:
            raise ValueError('`points` must be of shape (N, 3)')

        # Add a fourth column to points
        points_mat = np.ones((points.shape[0], 4))
        points_mat[:, :3] = points

        # Apply transform
        if not invert:
            mat = self.matrix
        else:
            mat = np.linalg.inv(self.matrix)

        return np.dot(mat, points_mat.T).T[:, :3]

Initialize transform.

Source code in navis/transforms/affine.py
44
45
46
47
48
49
50
51
def __init__(self, matrix: np.ndarray, direction: str = 'forward'):
    """Initialize transform."""
    assert direction in ('forward', 'inverse')

    self.matrix = matrix

    if direction == 'inverse':
        self.matrix = np.linalg.inv(self.matrix)

Return copy of transform.

Source code in navis/transforms/affine.py
77
78
79
80
81
82
83
84
85
86
def copy(self) -> 'AffineTransform':
    """Return copy of transform."""
    # Attributes not to copy
    no_copy = []
    # Generate new empty transform
    x = self.__class__(None)
    # Override with this neuron's data
    x.__dict__.update({k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy})

    return x

Apply transform to points.

PARAMETER DESCRIPTION
points
        (N, 3) array of x/y/z locations.

TYPE: np.ndarray

invert
        If True, will invert the transform.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
pointsxf

The transformed points.

TYPE: np.ndarray

Source code in navis/transforms/affine.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def xform(self, points: np.ndarray, invert: bool = False) -> np.ndarray:
    """Apply transform to points.

    Parameters
    ----------
    points :        np.ndarray
                    (N, 3) array of x/y/z locations.
    invert :        bool
                    If True, will invert the transform.

    Returns
    -------
    pointsxf :      np.ndarray
                    The transformed points.

    """
    points = np.asarray(points)

    if points.ndim != 2 and points.shape[1] != 3:
        raise ValueError('`points` must be of shape (N, 3)')

    # Add a fourth column to points
    points_mat = np.ones((points.shape[0], 4))
    points_mat[:, :3] = points

    # Apply transform
    if not invert:
        mat = self.matrix
    else:
        mat = np.linalg.inv(self.matrix)

    return np.dot(mat, points_mat.T).T[:, :3]

Helper transform that simply passes points through.

Useful for defining aliases.

Source code in navis/transforms/base.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class AliasTransform(BaseTransform):
    """Helper transform that simply passes points through.

    Useful for defining aliases.
    """

    def __init__(self):
        """Initialize."""
        pass

    def __neg__(self) -> 'AliasTransform':
        """Invert transform."""
        return self.copy()

    def __eq__(self, other):
        """Check if the same."""
        if isinstance(other, AliasTransform):
            True
        return False

    def copy(self):
        """Return copy."""
        x = AliasTransform()
        x.__dict__.update(self.__dict__)
        return x

    def xform(self, points: np.ndarray) -> np.ndarray:
        """Pass through.

        Note that the returned points are NOT a copy but the originals.
        """
        return points

Initialize.

Source code in navis/transforms/base.py
79
80
81
def __init__(self):
    """Initialize."""
    pass

Return copy.

Source code in navis/transforms/base.py
93
94
95
96
97
def copy(self):
    """Return copy."""
    x = AliasTransform()
    x.__dict__.update(self.__dict__)
    return x

Pass through.

Note that the returned points are NOT a copy but the originals.

Source code in navis/transforms/base.py
 99
100
101
102
103
104
def xform(self, points: np.ndarray) -> np.ndarray:
    """Pass through.

    Note that the returned points are NOT a copy but the originals.
    """
    return points

CMTK transforms of 3D spatial data.

Requires CMTK to be installed.

PARAMETER DESCRIPTION
regs
        Path(s) to CMTK transformations(s).

TYPE: str | list of str

directions
        Direction of transformation. Must provide one direction per
        `reg`.

TYPE: "forward" | "inverse" | list thereof DEFAULT: 'forward'

threads
        Number of threads to use.

TYPE: int DEFAULT: None

Examples:

>>> from navis import transforms
>>> tr = transforms.cmtk.CMTKtransform('/path/to/CMTK_directory.list')
>>> tr.xform(points)
Source code in navis/transforms/cmtk.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
class CMTKtransform(BaseTransform):
    """CMTK transforms of 3D spatial data.

    Requires [CMTK](https://www.nitrc.org/projects/cmtk/) to be installed.

    Parameters
    ----------
    regs :          str | list of str
                    Path(s) to CMTK transformations(s).
    directions :    "forward" | "inverse" | list thereof
                    Direction of transformation. Must provide one direction per
                    `reg`.
    threads :       int, optional
                    Number of threads to use.

    Examples
    --------
    >>> from navis import transforms
    >>> tr = transforms.cmtk.CMTKtransform('/path/to/CMTK_directory.list')
    >>> tr.xform(points) # doctest: +SKIP

    """

    def __init__(self, regs: list, directions: str = "forward", threads: int = None):
        self.directions = list(utils.make_iterable(directions))
        for d in self.directions:
            assert d in ("forward", "inverse"), (
                '`direction` must be "foward"' f'or "inverse", not "{d}"'
            )

        self.regs = list(utils.make_iterable(regs))
        self.command = "streamxform"
        self.threads = threads

        if len(directions) == 1 and len(regs) >= 1:
            directions = directions * len(regs)

        if len(self.regs) != len(self.directions):
            raise ValueError("Must provide one direction per regs")

    def __add__(self, other: "CMTKtransform") -> "CMTKtransform":
        """Implement addition operator to concatenate transforms."""
        if not isinstance(other, CMTKtransform):
            raise TypeError(f"Cannot add {type(other)} to CMTKtransform")

        if self.command != other.command:
            raise ValueError("Unable to merge CMTKtransforms using different commands.")

        x = self.copy()
        x.regs += other.regs
        x.directions += other.directions

        return x

    def __eq__(self, other: "CMTKtransform") -> bool:
        """Implement equality comparison."""
        if isinstance(other, CMTKtransform):
            if len(self) == len(other):
                if all([self.regs[i] == other.regs[i] for i in range(len(self))]):
                    if all(
                        [
                            self.directions[i] == other.directions[i]
                            for i in range(len(self))
                        ]
                    ):
                        return True
        return False

    def __len__(self) -> int:
        return len(self.regs)

    def __neg__(self) -> "CMTKtransform":
        """Invert direction."""
        x = self.copy()

        # Swap directions
        x.directions = [
            {"forward": "inverse", "inverse": "forward"}[d] for d in x.directions
        ]

        # Reverse order
        x.regs = x.regs[::-1]
        x.directions = x.directions[::-1]

        return x

    def __str__(self):
        return self.__repr__()

    def __repr__(self):
        return f"CMTKtransform with {len(self)} transform(s)"

    @staticmethod
    def from_file(filepath: str, **kwargs) -> "CMTKtransform":
        """Generate CMTKtransform from file.

        Parameters
        ----------
        filepath :  str
                    Path to CMTK transform.
        **kwargs
                    Keyword arguments passed to CMTKtransform.__init__

        Returns
        -------
        CMTKtransform

        """
        defaults = {"directions": "forward"}
        defaults.update(kwargs)
        return CMTKtransform(str(filepath), **defaults)

    def make_args(self, affine_only: bool = False) -> list:
        """Generate arguments passed to subprocess."""
        # Generate the arguments
        # The actual command (i.e. streamxform)
        args = [str(_cmtkbin / self.command)]

        if affine_only:
            args.append("--affine-only")

        if self.threads:
            args.append(f"--threads {int(self.threads)}")

        # Add the regargs
        args += self.regargs

        return args

    @property
    def regargs(self) -> list:
        """Generate regargs."""
        regargs = []
        for i, (reg, dir) in enumerate(zip(self.regs, self.directions)):
            if dir == "inverse":
                # For the first transform we need to prefix "--inverse" with
                # a solitary "--"
                if i == 0:
                    regargs.append("--")
                regargs.append("--inverse")
            # Note no double quotes!
            regargs.append(f"{reg}")
        return regargs

    def append(self, transform: "CMTKtransform", direction: str = None):
        """Add another transform.

        Parameters
        ----------
        transform :     str | CMTKtransform
                        Either another CMTKtransform or filepath to registration.
        direction :     "forward" | "inverse"
                        Only relevant if transform is filepath.

        """
        if isinstance(transform, CMTKtransform):
            if self.command != transform.command:
                raise ValueError(
                    "Unable to merge CMTKtransforms using " "different commands."
                )

            self.regs += transform.regs
            self.directions += transform.directions
        elif isinstance(transform, str):
            if not direction:
                raise ValueError("Must provide direction along with new transform")
            self.regs.append(transform)
            self.directions.append(direction)
        else:
            raise NotImplementedError(
                f"Unable to append {type(transform)} to {type(self)}"
            )

    def check_if_possible(self, on_error: str = "raise"):
        """Check if this transform is possible."""
        if not _cmtkbin:
            msg = (
                "Folder with CMTK binaries not found. Make sure the "
                "directory is in your PATH environment variable."
            )
            if on_error == "raise":
                raise BaseException(msg)
            return msg
        for r in self.regs:
            if not os.path.isdir(r) and not os.path.isfile(r):
                msg = f"Registration {r} not found."
                if on_error == "raise":
                    raise BaseException(msg)
                return msg

    def copy(self) -> "CMTKtransform":
        """Return copy."""
        # Attributes not to copy
        no_copy = []
        # Generate new empty transform
        x = self.__class__(None)
        # Override with this neuron's data
        x.__dict__.update(
            {k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy}
        )

        return x

    def parse_cmtk_output(self, output: str, fail_value=np.nan) -> np.ndarray:
        r"""Parse CMTK output.

        Briefly, CMTK output will be a byte literal like this:

            b'311 63 23 \n275 54 25 \n'

        In case of failed transforms we will get something like this where the
        original coordinates are returned with a "FAILED" flag

            b'343 72 23 \n-10 -10 -10 FAILED \n'

        Parameter
        ---------
        output :        tuple of (b'', None)
                        Stdout of CMTK call.
        fail_value
                        Value to use for points that failed to transform. By
                        default we use `np.nan`.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        The parse transformed points.

        """
        # The original stout is tuple where we care only about the second one
        if isinstance(output, tuple):
            output = output[0]

        pointsx = []
        # Split string into rows - lazily using a generator
        for row in (x.group(1) for x in re.finditer(r"(.*?) \n", output.decode())):
            # Split into values
            values = row.split(" ")

            # If this point failed
            if len(values) != 3:
                values = [fail_value] * 3
            else:
                values = [float(v) for v in values]

            pointsx.append(values)

        return np.asarray(pointsx)

    def xform(
        self,
        points: np.ndarray,
        affine_only: bool = False,
        affine_fallback: bool = False,
    ) -> np.ndarray:
        """Xform data.

        Parameters
        ----------
        points :            (N, 3) numpy array | pandas.DataFrame
                            Points to xform. DataFrame must have x/y/z columns.
        affine_only :       bool
                            Whether to apply only the non-rigid affine
                            transform. This is useful if points are outside
                            the deformation field and would therefore not
                            transform properly.
        affine_fallback :   bool
                            If True and some points did not transform during the
                            non-rigid part of the transformation, we will apply
                            only the affine transformation to those points.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        Transformed points. Points that failed to transform will
                        be `np.nan`.

        """
        self.check_if_possible(on_error="raise")

        if isinstance(points, pd.DataFrame):
            # Make sure x/y/z columns are present
            if np.any([c not in points for c in ["x", "y", "z"]]):
                raise ValueError("points DataFrame must have x/y/z columns.")
        elif (
            isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3
        ):
            points = pd.DataFrame(points, columns=["x", "y", "z"])
        else:
            raise TypeError(
                "`points` must be numpy array of shape (N, 3) or "
                "pandas DataFrame with x/y/z columns"
            )

        # Generate the result
        args = self.make_args(affine_only=affine_only)
        proc = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

        # Pipe in the points
        points_str = points[["x", "y", "z"]].to_string(index=False, header=False)

        # Do not use proc.stdin.write to avoid output buffer becoming full
        # before we finish piping in stdin.
        # proc.stdin.write(points_str.encode())
        # output = proc.communicate()

        # Read out results
        # This is equivalent to e.g.:
        # $ streamxform -args <<< "10, 10, 10"
        output = proc.communicate(input=points_str.encode())

        # If no output, something went wrong
        if not output[0]:
            raise utils.CMTKError("CMTK produced no output. Check points?")

        # Xformed points
        xf = self.parse_cmtk_output(output, fail_value=np.nan)

        # Check if any points not xformed
        if affine_fallback and not affine_only:
            not_xf = np.any(np.isnan(xf), axis=1)
            if np.any(not_xf):
                xf[not_xf] = self.xform(points.loc[not_xf], affine_only=True)

        return xf

    def to_grid_transform(self, template, absolute: bool = True, verbose: bool = False):
        """Convert to GridTransform via dense deformation field.

        Parameters
        ----------
        template :  str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) tuple
                    This defines the bounds and voxel size of the deformation
                    field. Typically, this would correspond to the source space
                    (i.e. the moving image). We can work with:
                      - str: a filepath to a NRRD file
                      - TemplateBrain: a navis TemplateBrain object
                      - a tuple/list/array with (Nx, Ny, Nz, dx, dy, dz)
                        where N is the number of voxels in each dimension
                        and d is the voxel size.
        absolute :  bool
                    Whether to return absolute coordinates or offsets.
        verbose :   bool
                    Whether to print CMTK output.

        Returns
        -------
        transform:  GridTransform
                    A few notes on using the resulting GridTransform:
                      1. The transform will expect input coordinates in voxel space.
                         See the returned `voxel_size` array!
                      2. The transformed coordinates, on the other hand, will already
                         be in physical space (e.g. microns).
        voxel_size :   (3,) numpy array
                    The voxel size of the input space. This is important because
                    the GridTransform will expect input coordinates in voxel
                    space.
        """
        from .grid import GridTransform

        dfield, header = self.to_dfield(
            template, out=None, absolute=absolute, verbose=verbose
        )

        # Generate the transform
        transform = GridTransform(
            np.transpose(dfield, (1, 2, 3, 0)),
            type="coordinates" if absolute else "offsets",
        )
        # Get voxel size from header - note that the first row is for the 4th dimension
        # (i.e. the vector dimension of the offsets/coordinates) which we don't care about
        space_dir = np.diagonal(header["space directions"][1:])

        return transform, space_dir

    def to_dfield(
        self, template, out=None, absolute: bool = True, verbose: bool = False
    ) -> np.ndarray:
        """Convert transform to dense deformation field.

        Parameters
        ----------
        template :  str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) tuple
                    This defines the bounds and voxel size of the deformation
                    field. Typically, this would correspond to the source space
                    (i.e. the moving image). We can work with:
                      - str: a filepath to a NRRD file
                      - TemplateBrain: a navis TemplateBrain object
                      - a tuple/list/array with (Nx, Ny, Nz, dx, dy, dz)
                        where N is the number of voxels in each dimension
                        and d is the voxel size.
        out :       str, optional
                    NRRD filepath to save the deformation field. If None (default),
                    the deformation field will be returned as numpy array.
        absolute :  bool
                    Whether to return absolute coordinates or offsets.
        verbose :   bool
                    Whether to print CMTK output.

        Returns
        -------
        dfield :    np.ndarray
                    The dense deformation field as (3, Nx, Ny, Nz) numpy array
                    with either absolute coordinates or offsets where the
                    first dimension contains the x/y/z coordinates. Note that
                    the coordinates stored in the field are in physical space
                    (e.g. microns), not voxel space.
        header :    dict
                    The NRRD header associated with the deformation field.


        See Also
        --------
        CMTKtransform.to_grid_transform
                    Method to directly convert to GridTransform. Please see
                    that method's notes for details on how to handle the
                    deformation field.

        """
        from .templates import TemplateBrain  # avoid circular import

        # Translate template info into shape and voxel size
        if isinstance(template, TemplateBrain):
            if not hasattr(template, "dims") or not hasattr(template, "voxdims"):
                raise ValueError(
                    "TemplateBrain must have `dims` and `voxdims` attributes"
                )
            if len(template.dims) != 3 or len(template.voxdims) != 3:
                raise ValueError(
                    "TemplateBrain `dims` and `voxdims` must be of length 3"
                )
            template = list(template.dims) + list(template.voxdims)

        to_remove = []  # track temporary files to clean up later

        try:
            if isinstance(template, str):
                # Assume NRRD filepath
                if not os.path.isfile(template):
                    raise ValueError(f"Template file not found: {template}")
                ref_image = template
            elif isinstance(template, (tuple, list, np.ndarray)):
                if len(template) != 6:
                    raise ValueError(
                        "`template` tuple/list/array must be of length 6: "
                        "(Nx, Ny, Nz, dx, dy, dz)"
                    )

                # Create a temporary NRRD file to use as reference
                # (we're using zeros because that's highly compressible)
                im = np.zeros(np.array(template[:3]).astype(int), dtype=np.uint8)
                header = {
                    "space directions": np.diag(template[3:6]),
                    "kinds": ["domain", "domain", "domain"],
                    "space units": ["microns", "microns", "microns"],
                    "space origin": [0.0, 0.0, 0.0],
                    "labels": ["x", "y", "z"],
                    "space dimension": 3,
                }
                with tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False) as tf:
                    nrrd.write(tf.name, im, header=header)
                    to_remove.append(tf.name)
                    ref_image = tf.name
            else:
                raise TypeError(
                    f"`template` must be str, TemplateBrain or tuple, not {type(template)}"
                )

            if out is None:
                outfile = tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False).name
                to_remove.append(outfile)
            elif isinstance(out, (str, pathlib.Path)):
                outfile = pathlib.Path(out).resolve()
            else:
                raise ValueError(f"Invalid output type: {type(out)}")

            # Compile the command
            args = [str(_cmtkbin / "xform2dfield")]

            if absolute:
                args += ["--output-absolute"]

            args += [f"{outfile}"]
            args += [f"{ref_image}"]
            args += self.regargs

            # run the binary
            # avoid resourcewarnings with null
            with open(os.devnull, "w") as devnull:
                startupinfo = None
                if platform.system() == "Windows":
                    startupinfo = subprocess.STARTUPINFO()
                    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
                if verbose:
                    # in debug mode print the output
                    stdout = None
                else:
                    stdout = devnull

                if verbose:
                    config.logger.info("executing: {}".format(" ".join(args)))
                check_call(
                    args,
                    stdout=stdout,
                    stderr=subprocess.STDOUT,
                    startupinfo=startupinfo,
                )

            if out is None:
                # Return transformed image
                return nrrd.read(outfile)
            elif verbose:
                config.logger.info(f"Transformed image saved to {outfile}")

        except BaseException:
            raise
        finally:
            # Clean up temporary files
            for f in to_remove:
                os.remove(f)

    def xform_image(
        self,
        im,
        target,
        out=None,
        interpolation="linear",
        verbose=False,
    ):
        """Transform an image using CMTK's reformatx.

        Parameters
        ----------
        im :        3D numpy array | filepath
                    The floating image to transform.
        target :    str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) | (Nx, Ny, Nz, dx, dy, dz, Ox, Oy, Oz)
                    Defines the target image: dimensions in voxels (N), the voxel size (d) and optionally
                    an origin (0) for the target image. Can be provided as a string (name of a template),
                    a TemplateBrain object, a tuple/list/array with the target specs.
        out :       str, optional
                    The filepath to save the transformed image. If None (default), will return the
                    transformed image as np.ndarray.
        interpolation : "linear" | "nn" | "cubic" | "pv" | "sinc-cosine" | "sinc-hamming"
                    The interpolation method to use.
        verbose :   bool
                    Whether to print CMTK output.

        Returns
        -------
        np.ndarray | None
                    If out is None, returns the transformed image as np.ndarray. Otherwise, None.

        """
        assert interpolation in (
            "linear",
            "nn",
            "cubic",
            "pv",
            "sinc-cosine",
            "sinc-hamming",
        )

        # `reformatx` expects this format:
        # ./reformatx --floating {INPUT_FILE} -o {OUTPUT_FILE} {REFERENCE_SPECS} {TRANSFORMS}
        # where:
        # - {INPUT_FILE} is the image to transform
        # - {OUTPUT_FILE} is where the output will be saved
        # - {REFERENCE_SPECS} defines the target space; this needs to be either a NRRD
        #   file from which CMTK can extract the target grid or the actual specs:
        #   "--target-grid Nx,Ny,Nz:dX,dY,dZ:[Ox,Oy,Oz]" where N is the number of
        #   voxels in each dimension and d is the voxel size. The optional O is the
        #   origin of the image. If not provided, it is assumed to be (0, 0, 0).
        # - {TRANSFORMS} are the CMTK transform(s) to apply; prefix with "--inverse" to invert
        # Below command works to convert JFRC2 to FCWB:
        # /opt/local/lib/cmtk/bin/reformatx --verbose --floating JFRC2.nrrd -o JFRC2_xf.nrrd FCWB.nrrd ~/flybrain-data/BridgingRegistrations/FCWB_JFRC2.list
        # This took 2min 28s - should check if that is actually faster than the look-up approach we use in `images.py`
        # Note that inversion of transforms always comes with an overhead! When using the inverse transform instead, the above command took 2h 40min!

        if verbose and any((d == "inverse " for d in self.directions)):
            config.logger.warning(
                "Using inverse CMTK transforms with reformatx can be very slow! If possible, consider using only forward transforms."
            )

        target_specs = parse_target_specs(target)

        to_remove = []
        if isinstance(im, (str, pathlib.Path)):
            floating = pathlib.Path(im)
            if not im.is_file():
                raise ValueError(f"Image file not found: {im}")
        elif isinstance(im, np.ndarray):
            assert im.ndim == 3
            # Save to temporary file
            with tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False) as tf:
                nrrd.write(tf.name, im)
                floating = tf.name
                to_remove.append(tf.name)
        else:
            raise ValueError(f"Invalid image type: {type(im)}")

        if out is None:
            outfile = tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False).name
            to_remove.append(outfile)
        elif isinstance(out, (str, pathlib.Path)):
            outfile = pathlib.Path(out).resolve()
        else:
            raise ValueError(f"Invalid output type: {type(out)}")

        # Compile the command
        args = [str(_cmtkbin / "reformatx")]
        args += [f"-o {outfile}"]
        args += [f"--floating {floating}"]
        args += [f"--{interpolation}"]
        args += [target_specs]

        # Add the regargs
        args += self.regargs

        try:
            # run the binary
            # avoid resourcewarnings with null
            with open(os.devnull, "w") as devnull:
                startupinfo = None
                if platform.system() == "Windows":
                    startupinfo = subprocess.STARTUPINFO()
                    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
                if verbose:
                    # in debug mode print the output
                    stdout = None
                else:
                    stdout = devnull

                if verbose:
                    config.logger.info("executing: {}".format(" ".join(args)))
                check_call(
                    args,
                    stdout=stdout,
                    stderr=subprocess.STDOUT,
                    startupinfo=startupinfo,
                )

            if out is None:
                # Return transformed image
                return nrrd.read(outfile)
            elif verbose:
                config.logger.info(f"Transformed image saved to {outfile}")
        except BaseException:
            raise
        finally:
            # Clean up temporary files
            for f in to_remove:
                os.remove(f)

Generate regargs.

Add another transform.

PARAMETER DESCRIPTION
transform
        Either another CMTKtransform or filepath to registration.

TYPE: str | CMTKtransform

direction
        Only relevant if transform is filepath.

TYPE: "forward" | "inverse" DEFAULT: None

Source code in navis/transforms/cmtk.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def append(self, transform: "CMTKtransform", direction: str = None):
    """Add another transform.

    Parameters
    ----------
    transform :     str | CMTKtransform
                    Either another CMTKtransform or filepath to registration.
    direction :     "forward" | "inverse"
                    Only relevant if transform is filepath.

    """
    if isinstance(transform, CMTKtransform):
        if self.command != transform.command:
            raise ValueError(
                "Unable to merge CMTKtransforms using " "different commands."
            )

        self.regs += transform.regs
        self.directions += transform.directions
    elif isinstance(transform, str):
        if not direction:
            raise ValueError("Must provide direction along with new transform")
        self.regs.append(transform)
        self.directions.append(direction)
    else:
        raise NotImplementedError(
            f"Unable to append {type(transform)} to {type(self)}"
        )

Check if this transform is possible.

Source code in navis/transforms/cmtk.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def check_if_possible(self, on_error: str = "raise"):
    """Check if this transform is possible."""
    if not _cmtkbin:
        msg = (
            "Folder with CMTK binaries not found. Make sure the "
            "directory is in your PATH environment variable."
        )
        if on_error == "raise":
            raise BaseException(msg)
        return msg
    for r in self.regs:
        if not os.path.isdir(r) and not os.path.isfile(r):
            msg = f"Registration {r} not found."
            if on_error == "raise":
                raise BaseException(msg)
            return msg

Return copy.

Source code in navis/transforms/cmtk.py
353
354
355
356
357
358
359
360
361
362
363
364
def copy(self) -> "CMTKtransform":
    """Return copy."""
    # Attributes not to copy
    no_copy = []
    # Generate new empty transform
    x = self.__class__(None)
    # Override with this neuron's data
    x.__dict__.update(
        {k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy}
    )

    return x

Generate CMTKtransform from file.

PARAMETER DESCRIPTION
filepath
    Path to CMTK transform.

TYPE: str

**kwargs
    Keyword arguments passed to CMTKtransform.__init__

DEFAULT: {}

RETURNS DESCRIPTION
CMTKtransform
Source code in navis/transforms/cmtk.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
@staticmethod
def from_file(filepath: str, **kwargs) -> "CMTKtransform":
    """Generate CMTKtransform from file.

    Parameters
    ----------
    filepath :  str
                Path to CMTK transform.
    **kwargs
                Keyword arguments passed to CMTKtransform.__init__

    Returns
    -------
    CMTKtransform

    """
    defaults = {"directions": "forward"}
    defaults.update(kwargs)
    return CMTKtransform(str(filepath), **defaults)

Generate arguments passed to subprocess.

Source code in navis/transforms/cmtk.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def make_args(self, affine_only: bool = False) -> list:
    """Generate arguments passed to subprocess."""
    # Generate the arguments
    # The actual command (i.e. streamxform)
    args = [str(_cmtkbin / self.command)]

    if affine_only:
        args.append("--affine-only")

    if self.threads:
        args.append(f"--threads {int(self.threads)}")

    # Add the regargs
    args += self.regargs

    return args

Parse CMTK output.

Briefly, CMTK output will be a byte literal like this:

b'311 63 23 \n275 54 25 \n'

In case of failed transforms we will get something like this where the original coordinates are returned with a "FAILED" flag

b'343 72 23 \n-10 -10 -10 FAILED \n'
Parameter

output : tuple of (b'', None) Stdout of CMTK call. fail_value Value to use for points that failed to transform. By default we use np.nan.

RETURNS DESCRIPTION
pointsxf

The parse transformed points.

TYPE: (N, 3) numpy array

Source code in navis/transforms/cmtk.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def parse_cmtk_output(self, output: str, fail_value=np.nan) -> np.ndarray:
    r"""Parse CMTK output.

    Briefly, CMTK output will be a byte literal like this:

        b'311 63 23 \n275 54 25 \n'

    In case of failed transforms we will get something like this where the
    original coordinates are returned with a "FAILED" flag

        b'343 72 23 \n-10 -10 -10 FAILED \n'

    Parameter
    ---------
    output :        tuple of (b'', None)
                    Stdout of CMTK call.
    fail_value
                    Value to use for points that failed to transform. By
                    default we use `np.nan`.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    The parse transformed points.

    """
    # The original stout is tuple where we care only about the second one
    if isinstance(output, tuple):
        output = output[0]

    pointsx = []
    # Split string into rows - lazily using a generator
    for row in (x.group(1) for x in re.finditer(r"(.*?) \n", output.decode())):
        # Split into values
        values = row.split(" ")

        # If this point failed
        if len(values) != 3:
            values = [fail_value] * 3
        else:
            values = [float(v) for v in values]

        pointsx.append(values)

    return np.asarray(pointsx)

Convert transform to dense deformation field.

PARAMETER DESCRIPTION
template
    This defines the bounds and voxel size of the deformation
    field. Typically, this would correspond to the source space
    (i.e. the moving image). We can work with:
      - str: a filepath to a NRRD file
      - TemplateBrain: a navis TemplateBrain object
      - a tuple/list/array with (Nx, Ny, Nz, dx, dy, dz)
        where N is the number of voxels in each dimension
        and d is the voxel size.

TYPE: str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) tuple

out
    NRRD filepath to save the deformation field. If None (default),
    the deformation field will be returned as numpy array.

TYPE: str DEFAULT: None

absolute
    Whether to return absolute coordinates or offsets.

TYPE: bool DEFAULT: True

verbose
    Whether to print CMTK output.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dfield

The dense deformation field as (3, Nx, Ny, Nz) numpy array with either absolute coordinates or offsets where the first dimension contains the x/y/z coordinates. Note that the coordinates stored in the field are in physical space (e.g. microns), not voxel space.

TYPE: np.ndarray

header

The NRRD header associated with the deformation field.

TYPE: dict

See Also

CMTKtransform.to_grid_transform Method to directly convert to GridTransform. Please see that method's notes for details on how to handle the deformation field.

Source code in navis/transforms/cmtk.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
def to_dfield(
    self, template, out=None, absolute: bool = True, verbose: bool = False
) -> np.ndarray:
    """Convert transform to dense deformation field.

    Parameters
    ----------
    template :  str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) tuple
                This defines the bounds and voxel size of the deformation
                field. Typically, this would correspond to the source space
                (i.e. the moving image). We can work with:
                  - str: a filepath to a NRRD file
                  - TemplateBrain: a navis TemplateBrain object
                  - a tuple/list/array with (Nx, Ny, Nz, dx, dy, dz)
                    where N is the number of voxels in each dimension
                    and d is the voxel size.
    out :       str, optional
                NRRD filepath to save the deformation field. If None (default),
                the deformation field will be returned as numpy array.
    absolute :  bool
                Whether to return absolute coordinates or offsets.
    verbose :   bool
                Whether to print CMTK output.

    Returns
    -------
    dfield :    np.ndarray
                The dense deformation field as (3, Nx, Ny, Nz) numpy array
                with either absolute coordinates or offsets where the
                first dimension contains the x/y/z coordinates. Note that
                the coordinates stored in the field are in physical space
                (e.g. microns), not voxel space.
    header :    dict
                The NRRD header associated with the deformation field.


    See Also
    --------
    CMTKtransform.to_grid_transform
                Method to directly convert to GridTransform. Please see
                that method's notes for details on how to handle the
                deformation field.

    """
    from .templates import TemplateBrain  # avoid circular import

    # Translate template info into shape and voxel size
    if isinstance(template, TemplateBrain):
        if not hasattr(template, "dims") or not hasattr(template, "voxdims"):
            raise ValueError(
                "TemplateBrain must have `dims` and `voxdims` attributes"
            )
        if len(template.dims) != 3 or len(template.voxdims) != 3:
            raise ValueError(
                "TemplateBrain `dims` and `voxdims` must be of length 3"
            )
        template = list(template.dims) + list(template.voxdims)

    to_remove = []  # track temporary files to clean up later

    try:
        if isinstance(template, str):
            # Assume NRRD filepath
            if not os.path.isfile(template):
                raise ValueError(f"Template file not found: {template}")
            ref_image = template
        elif isinstance(template, (tuple, list, np.ndarray)):
            if len(template) != 6:
                raise ValueError(
                    "`template` tuple/list/array must be of length 6: "
                    "(Nx, Ny, Nz, dx, dy, dz)"
                )

            # Create a temporary NRRD file to use as reference
            # (we're using zeros because that's highly compressible)
            im = np.zeros(np.array(template[:3]).astype(int), dtype=np.uint8)
            header = {
                "space directions": np.diag(template[3:6]),
                "kinds": ["domain", "domain", "domain"],
                "space units": ["microns", "microns", "microns"],
                "space origin": [0.0, 0.0, 0.0],
                "labels": ["x", "y", "z"],
                "space dimension": 3,
            }
            with tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False) as tf:
                nrrd.write(tf.name, im, header=header)
                to_remove.append(tf.name)
                ref_image = tf.name
        else:
            raise TypeError(
                f"`template` must be str, TemplateBrain or tuple, not {type(template)}"
            )

        if out is None:
            outfile = tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False).name
            to_remove.append(outfile)
        elif isinstance(out, (str, pathlib.Path)):
            outfile = pathlib.Path(out).resolve()
        else:
            raise ValueError(f"Invalid output type: {type(out)}")

        # Compile the command
        args = [str(_cmtkbin / "xform2dfield")]

        if absolute:
            args += ["--output-absolute"]

        args += [f"{outfile}"]
        args += [f"{ref_image}"]
        args += self.regargs

        # run the binary
        # avoid resourcewarnings with null
        with open(os.devnull, "w") as devnull:
            startupinfo = None
            if platform.system() == "Windows":
                startupinfo = subprocess.STARTUPINFO()
                startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
            if verbose:
                # in debug mode print the output
                stdout = None
            else:
                stdout = devnull

            if verbose:
                config.logger.info("executing: {}".format(" ".join(args)))
            check_call(
                args,
                stdout=stdout,
                stderr=subprocess.STDOUT,
                startupinfo=startupinfo,
            )

        if out is None:
            # Return transformed image
            return nrrd.read(outfile)
        elif verbose:
            config.logger.info(f"Transformed image saved to {outfile}")

    except BaseException:
        raise
    finally:
        # Clean up temporary files
        for f in to_remove:
            os.remove(f)

Convert to GridTransform via dense deformation field.

PARAMETER DESCRIPTION
template
    This defines the bounds and voxel size of the deformation
    field. Typically, this would correspond to the source space
    (i.e. the moving image). We can work with:
      - str: a filepath to a NRRD file
      - TemplateBrain: a navis TemplateBrain object
      - a tuple/list/array with (Nx, Ny, Nz, dx, dy, dz)
        where N is the number of voxels in each dimension
        and d is the voxel size.

TYPE: str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) tuple

absolute
    Whether to return absolute coordinates or offsets.

TYPE: bool DEFAULT: True

verbose
    Whether to print CMTK output.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
transform

A few notes on using the resulting GridTransform: 1. The transform will expect input coordinates in voxel space. See the returned voxel_size array! 2. The transformed coordinates, on the other hand, will already be in physical space (e.g. microns).

TYPE: GridTransform

voxel_size

The voxel size of the input space. This is important because the GridTransform will expect input coordinates in voxel space.

TYPE: (3,) numpy array

Source code in navis/transforms/cmtk.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def to_grid_transform(self, template, absolute: bool = True, verbose: bool = False):
    """Convert to GridTransform via dense deformation field.

    Parameters
    ----------
    template :  str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) tuple
                This defines the bounds and voxel size of the deformation
                field. Typically, this would correspond to the source space
                (i.e. the moving image). We can work with:
                  - str: a filepath to a NRRD file
                  - TemplateBrain: a navis TemplateBrain object
                  - a tuple/list/array with (Nx, Ny, Nz, dx, dy, dz)
                    where N is the number of voxels in each dimension
                    and d is the voxel size.
    absolute :  bool
                Whether to return absolute coordinates or offsets.
    verbose :   bool
                Whether to print CMTK output.

    Returns
    -------
    transform:  GridTransform
                A few notes on using the resulting GridTransform:
                  1. The transform will expect input coordinates in voxel space.
                     See the returned `voxel_size` array!
                  2. The transformed coordinates, on the other hand, will already
                     be in physical space (e.g. microns).
    voxel_size :   (3,) numpy array
                The voxel size of the input space. This is important because
                the GridTransform will expect input coordinates in voxel
                space.
    """
    from .grid import GridTransform

    dfield, header = self.to_dfield(
        template, out=None, absolute=absolute, verbose=verbose
    )

    # Generate the transform
    transform = GridTransform(
        np.transpose(dfield, (1, 2, 3, 0)),
        type="coordinates" if absolute else "offsets",
    )
    # Get voxel size from header - note that the first row is for the 4th dimension
    # (i.e. the vector dimension of the offsets/coordinates) which we don't care about
    space_dir = np.diagonal(header["space directions"][1:])

    return transform, space_dir

Xform data.

PARAMETER DESCRIPTION
points
            Points to xform. DataFrame must have x/y/z columns.

TYPE: (N, 3) numpy array | pandas.DataFrame

affine_only
            Whether to apply only the non-rigid affine
            transform. This is useful if points are outside
            the deformation field and would therefore not
            transform properly.

TYPE: bool DEFAULT: False

affine_fallback
            If True and some points did not transform during the
            non-rigid part of the transformation, we will apply
            only the affine transformation to those points.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
pointsxf

Transformed points. Points that failed to transform will be np.nan.

TYPE: (N, 3) numpy array

Source code in navis/transforms/cmtk.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def xform(
    self,
    points: np.ndarray,
    affine_only: bool = False,
    affine_fallback: bool = False,
) -> np.ndarray:
    """Xform data.

    Parameters
    ----------
    points :            (N, 3) numpy array | pandas.DataFrame
                        Points to xform. DataFrame must have x/y/z columns.
    affine_only :       bool
                        Whether to apply only the non-rigid affine
                        transform. This is useful if points are outside
                        the deformation field and would therefore not
                        transform properly.
    affine_fallback :   bool
                        If True and some points did not transform during the
                        non-rigid part of the transformation, we will apply
                        only the affine transformation to those points.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    Transformed points. Points that failed to transform will
                    be `np.nan`.

    """
    self.check_if_possible(on_error="raise")

    if isinstance(points, pd.DataFrame):
        # Make sure x/y/z columns are present
        if np.any([c not in points for c in ["x", "y", "z"]]):
            raise ValueError("points DataFrame must have x/y/z columns.")
    elif (
        isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3
    ):
        points = pd.DataFrame(points, columns=["x", "y", "z"])
    else:
        raise TypeError(
            "`points` must be numpy array of shape (N, 3) or "
            "pandas DataFrame with x/y/z columns"
        )

    # Generate the result
    args = self.make_args(affine_only=affine_only)
    proc = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    # Pipe in the points
    points_str = points[["x", "y", "z"]].to_string(index=False, header=False)

    # Do not use proc.stdin.write to avoid output buffer becoming full
    # before we finish piping in stdin.
    # proc.stdin.write(points_str.encode())
    # output = proc.communicate()

    # Read out results
    # This is equivalent to e.g.:
    # $ streamxform -args <<< "10, 10, 10"
    output = proc.communicate(input=points_str.encode())

    # If no output, something went wrong
    if not output[0]:
        raise utils.CMTKError("CMTK produced no output. Check points?")

    # Xformed points
    xf = self.parse_cmtk_output(output, fail_value=np.nan)

    # Check if any points not xformed
    if affine_fallback and not affine_only:
        not_xf = np.any(np.isnan(xf), axis=1)
        if np.any(not_xf):
            xf[not_xf] = self.xform(points.loc[not_xf], affine_only=True)

    return xf

Transform an image using CMTK's reformatx.

PARAMETER DESCRIPTION
im
    The floating image to transform.

TYPE: 3D numpy array | filepath

target
    Defines the target image: dimensions in voxels (N), the voxel size (d) and optionally
    an origin (0) for the target image. Can be provided as a string (name of a template),
    a TemplateBrain object, a tuple/list/array with the target specs.

TYPE: str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) | (Nx, Ny, Nz, dx, dy, dz, Ox, Oy, Oz)

out
    The filepath to save the transformed image. If None (default), will return the
    transformed image as np.ndarray.

TYPE: str DEFAULT: None

interpolation
    The interpolation method to use.

TYPE: linear | nn | cubic | pv | sinc - cosine | sinc - hamming DEFAULT: 'linear'

verbose
    Whether to print CMTK output.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
np.ndarray | None

If out is None, returns the transformed image as np.ndarray. Otherwise, None.

Source code in navis/transforms/cmtk.py
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
def xform_image(
    self,
    im,
    target,
    out=None,
    interpolation="linear",
    verbose=False,
):
    """Transform an image using CMTK's reformatx.

    Parameters
    ----------
    im :        3D numpy array | filepath
                The floating image to transform.
    target :    str | TemplateBrain | (Nx, Ny, Nz, dx, dy, dz) | (Nx, Ny, Nz, dx, dy, dz, Ox, Oy, Oz)
                Defines the target image: dimensions in voxels (N), the voxel size (d) and optionally
                an origin (0) for the target image. Can be provided as a string (name of a template),
                a TemplateBrain object, a tuple/list/array with the target specs.
    out :       str, optional
                The filepath to save the transformed image. If None (default), will return the
                transformed image as np.ndarray.
    interpolation : "linear" | "nn" | "cubic" | "pv" | "sinc-cosine" | "sinc-hamming"
                The interpolation method to use.
    verbose :   bool
                Whether to print CMTK output.

    Returns
    -------
    np.ndarray | None
                If out is None, returns the transformed image as np.ndarray. Otherwise, None.

    """
    assert interpolation in (
        "linear",
        "nn",
        "cubic",
        "pv",
        "sinc-cosine",
        "sinc-hamming",
    )

    # `reformatx` expects this format:
    # ./reformatx --floating {INPUT_FILE} -o {OUTPUT_FILE} {REFERENCE_SPECS} {TRANSFORMS}
    # where:
    # - {INPUT_FILE} is the image to transform
    # - {OUTPUT_FILE} is where the output will be saved
    # - {REFERENCE_SPECS} defines the target space; this needs to be either a NRRD
    #   file from which CMTK can extract the target grid or the actual specs:
    #   "--target-grid Nx,Ny,Nz:dX,dY,dZ:[Ox,Oy,Oz]" where N is the number of
    #   voxels in each dimension and d is the voxel size. The optional O is the
    #   origin of the image. If not provided, it is assumed to be (0, 0, 0).
    # - {TRANSFORMS} are the CMTK transform(s) to apply; prefix with "--inverse" to invert
    # Below command works to convert JFRC2 to FCWB:
    # /opt/local/lib/cmtk/bin/reformatx --verbose --floating JFRC2.nrrd -o JFRC2_xf.nrrd FCWB.nrrd ~/flybrain-data/BridgingRegistrations/FCWB_JFRC2.list
    # This took 2min 28s - should check if that is actually faster than the look-up approach we use in `images.py`
    # Note that inversion of transforms always comes with an overhead! When using the inverse transform instead, the above command took 2h 40min!

    if verbose and any((d == "inverse " for d in self.directions)):
        config.logger.warning(
            "Using inverse CMTK transforms with reformatx can be very slow! If possible, consider using only forward transforms."
        )

    target_specs = parse_target_specs(target)

    to_remove = []
    if isinstance(im, (str, pathlib.Path)):
        floating = pathlib.Path(im)
        if not im.is_file():
            raise ValueError(f"Image file not found: {im}")
    elif isinstance(im, np.ndarray):
        assert im.ndim == 3
        # Save to temporary file
        with tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False) as tf:
            nrrd.write(tf.name, im)
            floating = tf.name
            to_remove.append(tf.name)
    else:
        raise ValueError(f"Invalid image type: {type(im)}")

    if out is None:
        outfile = tempfile.NamedTemporaryFile(suffix=".nrrd", delete=False).name
        to_remove.append(outfile)
    elif isinstance(out, (str, pathlib.Path)):
        outfile = pathlib.Path(out).resolve()
    else:
        raise ValueError(f"Invalid output type: {type(out)}")

    # Compile the command
    args = [str(_cmtkbin / "reformatx")]
    args += [f"-o {outfile}"]
    args += [f"--floating {floating}"]
    args += [f"--{interpolation}"]
    args += [target_specs]

    # Add the regargs
    args += self.regargs

    try:
        # run the binary
        # avoid resourcewarnings with null
        with open(os.devnull, "w") as devnull:
            startupinfo = None
            if platform.system() == "Windows":
                startupinfo = subprocess.STARTUPINFO()
                startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
            if verbose:
                # in debug mode print the output
                stdout = None
            else:
                stdout = devnull

            if verbose:
                config.logger.info("executing: {}".format(" ".join(args)))
            check_call(
                args,
                stdout=stdout,
                stderr=subprocess.STDOUT,
                startupinfo=startupinfo,
            )

        if out is None:
            # Return transformed image
            return nrrd.read(outfile)
        elif verbose:
            config.logger.info(f"Transformed image saved to {outfile}")
    except BaseException:
        raise
    finally:
        # Clean up temporary files
        for f in to_remove:
            os.remove(f)

Elastix transforms of 3D spatial data.

Requires Elastix. Based on code by Jasper Phelps (https://github.com/jasper-tms/pytransformix).

Note that elastix transforms can not be inverted!

PARAMETER DESCRIPTION
file
            Filepath to elastix transformation file.

TYPE: str

copy_files
            Any files that need to be copied into the temporary
            directory where we perform the transform. These are
            typically files supplemental to the main transform
            file (e.g. defining an additional affine transform).

TYPE: filepath | list DEFAULT: []

Examples:

>>> from navis import transforms
>>> tr = transforms.ElastixTransform('/path/to/transform/transform')
>>> tr.xform(points)
Source code in navis/transforms/elastix.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
class ElastixTransform(BaseTransform):
    """Elastix transforms of 3D spatial data.

    Requires [Elastix](https://github.com/SuperElastix/elastix/). Based on
    code by Jasper Phelps (<https://github.com/jasper-tms/pytransformix>).

    Note that elastix transforms can not be inverted!

    Parameters
    ----------
    file :              str
                        Filepath to elastix transformation file.
    copy_files :        filepath | list, optional
                        Any files that need to be copied into the temporary
                        directory where we perform the transform. These are
                        typically files supplemental to the main transform
                        file (e.g. defining an additional affine transform).

    Examples
    --------
    >>> from navis import transforms
    >>> tr = transforms.ElastixTransform('/path/to/transform/transform')
    >>> tr.xform(points) # doctest: +SKIP

    """

    def __init__(self, file: str, copy_files=[]):
        self.file = pathlib.Path(file)
        self.copy_files = copy_files

    def __eq__(self, other: "ElastixTransform") -> bool:
        """Implement equality comparison."""
        if isinstance(other, ElastixTransform):
            if self.file == other.file:
                return True
        return False

    def check_if_possible(self, on_error: str = "raise"):
        """Check if this transform is possible."""
        if not _elastixbin:
            msg = (
                "Folder with elastix binaries not found. Make sure the "
                "directory is in your PATH environment variable."
            )
            if on_error == "raise":
                raise BaseException(msg)
            return msg
        if not self.file.is_file():
            msg = f"Transformation file {self.file} not found."
            if on_error == "raise":
                raise BaseException(msg)
                return msg

    def copy(self) -> "ElastixTransform":
        """Return copy."""
        # Attributes not to copy
        no_copy = []
        # Generate new empty transform
        x = self.__class__(self.file)
        # Override with this neuron's data
        x.__dict__.update(
            {k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy}
        )

        return x

    def write_input_file(self, points, filepath):
        """Write a numpy array in format required by transformix."""
        with open(filepath, "w") as f:
            f.write("point\n{}\n".format(len(points)))
            for x, y, z in points:
                f.write(f"{x:f} {y:f} {z:f}\n")

    def read_output_file(self, filepath) -> np.ndarray:
        """Load output file.

        Parameter
        ---------
        filepath :      str
                        Filepath to output file.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        The parse transformed points.

        """
        points = []
        with open(filepath, "r") as f:
            for line in f.readlines():
                output = line.split("OutputPoint = [ ")[1].split(" ]")[0]
                points.append([float(i) for i in output.split(" ")])
        return np.array(points)

    def xform(self, points: np.ndarray, return_logs=False) -> np.ndarray:
        """Xform data.

        Parameters
        ----------
        points :        (N, 3) numpy array | pandas.DataFrame
                        Points to xform. DataFrame must have x/y/z columns.
        return_logs :   bool
                        If True, will return logs instead of transformed points.
                        Really only useful for debugging.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        Transformed points.

        """
        self.check_if_possible(on_error="raise")

        if isinstance(points, pd.DataFrame):
            # Make sure x/y/z columns are present
            if np.any([c not in points for c in ["x", "y", "z"]]):
                raise ValueError("points DataFrame must have x/y/z columns.")
            points = points[["x", "y", "z"]].values
        elif not (
            isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3
        ):
            raise TypeError(
                "`points` must be numpy array of shape (N, 3) or "
                "pandas DataFrame with x/y/z columns"
            )

        # Everything happens in a temporary directory
        with tempfile.TemporaryDirectory() as tempdir:
            p = pathlib.Path(tempdir)

            # If required, copy additional files into the temporary directory
            if self.copy_files:
                for f in make_iterable(self.copy_files):
                    _ = pathlib.Path(shutil.copy(f, p))

            # Write points to file
            in_file = p / "inputpoints.txt"
            self.write_input_file(points, in_file)

            out_file = p / "outputpoints.txt"

            # Prepare the command
            command = [
                _elastixbin / "transformix",
                "-out",
                str(p),
                "-tp",
                str(self.file),
                "-def",
                str(in_file),
            ]

            # Keep track of current working directory
            cwd = os.getcwd()
            try:
                # Change working directory to the temporary directory
                # This is apparently required because elastix stupidly expects
                # any secondary transform files to be in the current directory
                # (as opposed to where the main transform is)
                os.chdir(p)
                # Run the actual transform
                proc = subprocess.run(command, stdout=subprocess.PIPE)
            except BaseException:
                raise
            finally:
                # This makes sure we land on our feet even in case of an error
                os.chdir(cwd)

            if return_logs:
                logfile = p / "transformix.log"
                if not logfile.is_file():
                    raise FileNotFoundError("No log file found.")
                with open(logfile) as f:
                    logs = f.read()
                return logs

            if not out_file.is_file():
                raise FileNotFoundError(
                    "Elastix transform did not produce any "
                    f"output:\n {proc.stdout.decode()}"
                )

            points_xf = self.read_output_file(out_file)

        return points_xf

Check if this transform is possible.

Source code in navis/transforms/elastix.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def check_if_possible(self, on_error: str = "raise"):
    """Check if this transform is possible."""
    if not _elastixbin:
        msg = (
            "Folder with elastix binaries not found. Make sure the "
            "directory is in your PATH environment variable."
        )
        if on_error == "raise":
            raise BaseException(msg)
        return msg
    if not self.file.is_file():
        msg = f"Transformation file {self.file} not found."
        if on_error == "raise":
            raise BaseException(msg)
            return msg

Return copy.

Source code in navis/transforms/elastix.py
181
182
183
184
185
186
187
188
189
190
191
192
def copy(self) -> "ElastixTransform":
    """Return copy."""
    # Attributes not to copy
    no_copy = []
    # Generate new empty transform
    x = self.__class__(self.file)
    # Override with this neuron's data
    x.__dict__.update(
        {k: copy.copy(v) for k, v in self.__dict__.items() if k not in no_copy}
    )

    return x

Load output file.

Parameter

filepath : str Filepath to output file.

RETURNS DESCRIPTION
pointsxf

The parse transformed points.

TYPE: (N, 3) numpy array

Source code in navis/transforms/elastix.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def read_output_file(self, filepath) -> np.ndarray:
    """Load output file.

    Parameter
    ---------
    filepath :      str
                    Filepath to output file.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    The parse transformed points.

    """
    points = []
    with open(filepath, "r") as f:
        for line in f.readlines():
            output = line.split("OutputPoint = [ ")[1].split(" ]")[0]
            points.append([float(i) for i in output.split(" ")])
    return np.array(points)

Write a numpy array in format required by transformix.

Source code in navis/transforms/elastix.py
194
195
196
197
198
199
def write_input_file(self, points, filepath):
    """Write a numpy array in format required by transformix."""
    with open(filepath, "w") as f:
        f.write("point\n{}\n".format(len(points)))
        for x, y, z in points:
            f.write(f"{x:f} {y:f} {z:f}\n")

Xform data.

PARAMETER DESCRIPTION
points
        Points to xform. DataFrame must have x/y/z columns.

TYPE: (N, 3) numpy array | pandas.DataFrame

return_logs
        If True, will return logs instead of transformed points.
        Really only useful for debugging.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
pointsxf

Transformed points.

TYPE: (N, 3) numpy array

Source code in navis/transforms/elastix.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def xform(self, points: np.ndarray, return_logs=False) -> np.ndarray:
    """Xform data.

    Parameters
    ----------
    points :        (N, 3) numpy array | pandas.DataFrame
                    Points to xform. DataFrame must have x/y/z columns.
    return_logs :   bool
                    If True, will return logs instead of transformed points.
                    Really only useful for debugging.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    Transformed points.

    """
    self.check_if_possible(on_error="raise")

    if isinstance(points, pd.DataFrame):
        # Make sure x/y/z columns are present
        if np.any([c not in points for c in ["x", "y", "z"]]):
            raise ValueError("points DataFrame must have x/y/z columns.")
        points = points[["x", "y", "z"]].values
    elif not (
        isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3
    ):
        raise TypeError(
            "`points` must be numpy array of shape (N, 3) or "
            "pandas DataFrame with x/y/z columns"
        )

    # Everything happens in a temporary directory
    with tempfile.TemporaryDirectory() as tempdir:
        p = pathlib.Path(tempdir)

        # If required, copy additional files into the temporary directory
        if self.copy_files:
            for f in make_iterable(self.copy_files):
                _ = pathlib.Path(shutil.copy(f, p))

        # Write points to file
        in_file = p / "inputpoints.txt"
        self.write_input_file(points, in_file)

        out_file = p / "outputpoints.txt"

        # Prepare the command
        command = [
            _elastixbin / "transformix",
            "-out",
            str(p),
            "-tp",
            str(self.file),
            "-def",
            str(in_file),
        ]

        # Keep track of current working directory
        cwd = os.getcwd()
        try:
            # Change working directory to the temporary directory
            # This is apparently required because elastix stupidly expects
            # any secondary transform files to be in the current directory
            # (as opposed to where the main transform is)
            os.chdir(p)
            # Run the actual transform
            proc = subprocess.run(command, stdout=subprocess.PIPE)
        except BaseException:
            raise
        finally:
            # This makes sure we land on our feet even in case of an error
            os.chdir(cwd)

        if return_logs:
            logfile = p / "transformix.log"
            if not logfile.is_file():
                raise FileNotFoundError("No log file found.")
            with open(logfile) as f:
                logs = f.read()
            return logs

        if not out_file.is_file():
            raise FileNotFoundError(
                "Elastix transform did not produce any "
                f"output:\n {proc.stdout.decode()}"
            )

        points_xf = self.read_output_file(out_file)

    return points_xf

Apply custom function as transform.

PARAMETER DESCRIPTION
func
    Function that accepts and returns an (N, 3) array.

TYPE: callable

Source code in navis/transforms/base.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class FunctionTransform(BaseTransform):
    """Apply custom function as transform.

    Parameters
    ----------
    func :      callable
                Function that accepts and returns an (N, 3) array.

    """

    def __init__(self, func):
        """Initialize."""
        if not callable(func):
            raise TypeError('`func` must be callable')
        self.func = func

    def __eq__(self, other):
        """Check if the same."""
        if not isinstance(other, FunctionTransform):
            return False
        if self.func != other.func:
            return False
        return True

    def copy(self):
        """Return copy."""
        x = self.__class__(self.func)
        x.__dict__.update(self.__dict__)
        return x

    def xform(self, points: np.ndarray) -> np.ndarray:
        """Xform data.

        Parameters
        ----------
        points :        (N, 3) numpy array | pandas.DataFrame
                        Points to xform. DataFrame must have x/y/z columns.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        Transformed points.

        """
        if isinstance(points, pd.DataFrame):
            # Make sure x/y/z columns are present
            if np.any([c not in points for c in ['x', 'y', 'z']]):
                raise ValueError('points DataFrame must have x/y/z columns.')
            points = points[['x', 'y', 'z']].values
        elif not (isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3):
            raise TypeError('`points` must be numpy array of shape (N, 3) or '
                            'pandas DataFrame with x/y/z columns')
        return self.func(points.copy())

Initialize.

Source code in navis/transforms/base.py
117
118
119
120
121
def __init__(self, func):
    """Initialize."""
    if not callable(func):
        raise TypeError('`func` must be callable')
    self.func = func

Return copy.

Source code in navis/transforms/base.py
131
132
133
134
135
def copy(self):
    """Return copy."""
    x = self.__class__(self.func)
    x.__dict__.update(self.__dict__)
    return x

Xform data.

PARAMETER DESCRIPTION
points
        Points to xform. DataFrame must have x/y/z columns.

TYPE: (N, 3) numpy array | pandas.DataFrame

RETURNS DESCRIPTION
pointsxf

Transformed points.

TYPE: (N, 3) numpy array

Source code in navis/transforms/base.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def xform(self, points: np.ndarray) -> np.ndarray:
    """Xform data.

    Parameters
    ----------
    points :        (N, 3) numpy array | pandas.DataFrame
                    Points to xform. DataFrame must have x/y/z columns.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    Transformed points.

    """
    if isinstance(points, pd.DataFrame):
        # Make sure x/y/z columns are present
        if np.any([c not in points for c in ['x', 'y', 'z']]):
            raise ValueError('points DataFrame must have x/y/z columns.')
        points = points[['x', 'y', 'z']].values
    elif not (isinstance(points, np.ndarray) and points.ndim == 2 and points.shape[1] == 3):
        raise TypeError('`points` must be numpy array of shape (N, 3) or '
                        'pandas DataFrame with x/y/z columns')
    return self.func(points.copy())

Deformation or coordinate field transform of 3D spatial data.

This is effectively a simpler version of the H5transform class and only supports deformation fields stored as numpy arrays in memory.

PARAMETER DESCRIPTION
field
        Deformation/coordinate field. The last dimension must
        contain the x/y/z coordinates.

TYPE: (Nx, Ny, Nz, 3) numpy array

type
        Whether the field contains absolute coordinates or offsets/displacements.
        Offsets are added to the input coordinates, while
        coordinates returned as is.

TYPE: "offsets" | "coordinates" DEFAULT: 'offsets'

spacing
        Spacing of the deformation field in x/y/z. If not provided,
        spacing of 1 is assumed.

TYPE: tuple | list | numpy array DEFAULT: None

offset
        Offset of the deformation field in x/y/z. If not provided,
        offset of 0 is assumed. N.B. offsets are applied _after_ spacing,
        i.e. need to be provided in voxel space of the deformation field.

TYPE: tuple | list | numpy array DEFAULT: None

Source code in navis/transforms/grid.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
class GridTransform(BaseTransform):
    """Deformation or coordinate field transform of 3D spatial data.

    This is effectively a simpler version of the H5transform class and
    only supports deformation fields stored as numpy arrays in memory.

    Parameters
    ----------
    field :         (Nx, Ny, Nz, 3) numpy array
                    Deformation/coordinate field. The last dimension must
                    contain the x/y/z coordinates.
    type :          "offsets" | "coordinates"
                    Whether the field contains absolute coordinates or offsets/displacements.
                    Offsets are added to the input coordinates, while
                    coordinates returned as is.
    spacing :       tuple | list | numpy array, optional
                    Spacing of the deformation field in x/y/z. If not provided,
                    spacing of 1 is assumed.
    offset :        tuple | list | numpy array, optional
                    Offset of the deformation field in x/y/z. If not provided,
                    offset of 0 is assumed. N.B. offsets are applied _after_ spacing,
                    i.e. need to be provided in voxel space of the deformation field.

    """

    def __init__(
        self, field: np.ndarray, type: str = "offsets", spacing=None, offset=None
    ):
        """Init class."""
        assert (
            field.ndim == 4 and field.shape[-1] == 3
        ), "Field must be a 4D numpy array with the last dimension of size 3."
        assert type in (
            "coordinates",
            "offsets",
        ), "type must be 'coordinates' or 'offsets'."
        self.field = field
        self.type = type
        self.dtype = field.dtype
        self.spacing = spacing
        self.offset = offset

    def __eq__(self, other) -> bool:
        """Compare with other Transform."""
        if isinstance(other, GridTransform):
            if np.array_equal(self.field, other.field):
                return True
        return False

    def __neg__(self) -> "GridTransform":
        """Invert direction."""
        # Note to future self: we could implement this by computing the inverse
        # deformation field using fixed-point iteration, but that's
        # non-trivial and not needed right now.
        raise NotImplementedError("Inversion of GridTransform is not implemented.")

    @property
    def spacing(self):
        return self._spacing

    @spacing.setter
    def spacing(self, value):
        if value is None:
            self._spacing = None
        else:
            value = np.asarray(value)
            assert (
                value.ndim == 1 and value.size == 3
            ), "spacing must be a tuple/list/array of size 3."
            self._spacing = np.array(value, dtype=self.dtype)

    @property
    def offset(self):
        return self._offset

    @offset.setter
    def offset(self, value):
        if value is None:
            self._offset = None
        else:
            value = np.asarray(value)
            assert (
                value.ndim == 1 and value.size == 3
            ), "offset must be a tuple/list/array of size 3."
            self._offset = np.array(value, dtype=self.dtype)

    @property
    def affine(self) -> AffineTransform:
        """Return affine part of the transform."""
        # We're delaying the calculation of the affine part until it's needed
        if not hasattr(self, "_affine"):
            self.calculate_affine()
        return self._affine

    @property
    def shape(self) -> tuple:
        """Return shape of the deformation field."""
        return self.field.shape

    def calculate_affine(self) -> None:
        """Calculate affine part of the transform."""
        # The strategy here is this:
        # 1. Take the 8 corners of the deformation field
        # 2. Transform them using the deformation field
        # 3. Treat them as landmarks and compute the affine transform using morphops

        mx = np.array(self.shape) - 1  # max indices in each dimension
        points = np.array(
            [
                [0, 0, 0],
                [0, 0, mx[2]],
                [0, mx[1], 0],
                [0, mx[1], mx[2]],
                [mx[0], 0, 0],
                [mx[0], 0, mx[2]],
                [mx[0], mx[1], 0],
                [mx[0], mx[1], mx[2]],
            ]
        )

        if self.offset is not None:
            points = points - self.offset[np.newaxis, :]

        if self.spacing is not None:
            points = points * self.spacing[np.newaxis, :]

        points_xf = self.xform(points, affine_fallback=False)

        m = TPStransform(points, points_xf).matrix_rigid

        # Calculate the affine part as the mean displacement across the field
        self._affine = AffineTransform(m)

    def copy(self, copy_data: bool = False) -> "GridTransform":
        """Return copy."""
        if copy_data:
            return GridTransform(self.field.copy(), type=self.type)
        else:
            return GridTransform(self.field, type=self.type)

    @classmethod
    def from_file(cls, filepath: str) -> "GridTransform":
        """Create GridTransform a file.

        Parameters
        ----------
        file :          str
                        Path to file. Currently supported formats:
                          - NRRD files with deformation fields
                          - Numpy .npy files with deformation fields
                          - Nifti files (.nii, .nii.gz) with deformation fields

        Returns
        -------
        GridTransform instance.

        """
        filepath = Path(filepath)
        if not filepath.exists():
            raise FileNotFoundError(f"File {filepath} does not exist.")

        if filepath.suffix in [".npy"]:
            field = np.load(filepath)
        elif filepath.suffix in [".nii", ".nii.gz"]:
            try:
                import nibabel as nib
            except ModuleNotFoundError:
                raise ImportError(
                    "`nibabel` package is required to read Nifti files (.nii, .nii.gz)"
                )

            img = nib.load(str(filepath))
            field = img.get_fdata()
        elif filepath.suffix in [".nrrd"]:
            import nrrd

            field, _ = nrrd.read(str(filepath))
        else:
            raise ValueError(
                f"Unsupported file format: {filepath.suffix}. "
                "Supported formats are: .npy, .nii, .nii.gz, .nrrd"
            )

        return cls(field)

    @classmethod
    def from_warpfield(cls, warpfield):
        """Create GridTransform from a Warpfield deformation field.

        Parameters
        ----------
        warpfield :     warpfield.WarpMap | str
                        Warpfield WarpMap instance or path to a WarpMap h5 file.

        """
        if isinstance(warpfield, str):
            import h5py

            with h5py.File(warpfield, "r") as h5:
                wm = h5["warp_map"]
                field = wm["warp_field"][:]
                block_size = wm["block_size"][:]
                block_stride = wm["block_stride"][:]
                # mov_shape = wm["moving_shape"][:]
                # ref_shape = wm["ref_shape"][:]
        else:
            field = warpfield.warp_field
            block_size = warpfield.block_size
            block_stride = warpfield.block_stride
            # mov_shape = warpfield.mov_shape
            # ref_shape = warpfield.ref_shape

        # Reshape the field from (3, X, Y, Z) to (X, Y, Z, 3)
        field = np.moveaxis(field, 0, -1)

        spacing = block_stride
        offset = -block_size / block_stride / 2
        # Note to self regarding the offset: this code is taken straight from warpfield.

        return cls(field, type="offsets", spacing=spacing, offset=offset)

    def xform(
        self,
        points: np.ndarray,
        affine_fallback: bool = False,
    ) -> np.ndarray:
        """Xform data.

        Parameters
        ----------
        points :            (N, 3) numpy array | pandas.DataFrame
                            Points to xform. DataFrame must have x/y/z columns.
        affine_fallback :   bool
                            If True, points that are outside the deformation field
                            will be transformed using an affine transform defined by
                            the affine part of the deformation field.
                            If False, points outside the field will be set to np.nan.

        Returns
        -------
        pointsxf :          (N, 3) numpy.ndarray
                            Transformed points. Will contain `np.nan` for points
                            that did not transform.

        """
        if isinstance(points, pd.DataFrame):
            # Make sure x/y/z columns are present
            if np.any([c not in points for c in ["x", "y", "z"]]):
                raise ValueError("points DataFrame must have x/y/z columns.")
            points = points[["x", "y", "z"]].values

        if (
            not isinstance(points, np.ndarray)
            or points.ndim != 2
            or points.shape[1] != 3
        ):
            raise TypeError(
                "`points` must be numpy array of shape (N, 3) or "
                "pandas DataFrame with x/y/z columns"
            )

        points_vxl = points

        if self.spacing is not None:
            points_vxl = points_vxl / self.spacing[np.newaxis, :]

        if self.offset is not None:
            points_vxl = points_vxl + self.offset[np.newaxis, :]

        # For interpolation, we need to split the offsets into their x, y
        # and z component
        xgrid = self.field[:, :, :, 0]
        ygrid = self.field[:, :, :, 1]
        zgrid = self.field[:, :, :, 2]

        # Prepare points for interpolation
        xx = np.arange(0, self.shape[0])
        yy = np.arange(0, self.shape[1])
        zz = np.arange(0, self.shape[2])

        # The RegularGridInterpolator is the fastest but the results are
        # are ever so slightly (4th decimal) different from the Java implementation
        xinterp = RegularGridInterpolator(
            (xx, yy, zz), xgrid, bounds_error=False, fill_value=0
        )
        yinterp = RegularGridInterpolator(
            (xx, yy, zz), ygrid, bounds_error=False, fill_value=0
        )
        zinterp = RegularGridInterpolator(
            (xx, yy, zz), zgrid, bounds_error=False, fill_value=0
        )

        # Before we interpolate check how many points are outside the deformation field
        is_out = (points_vxl.min(axis=1) < 0) | np.any(points_vxl >= self.shape[:-1], axis=1)

        # Prepare output array
        if self.type == "coordinates":
            points_xf = np.zeros(points.shape, dtype=self.dtype)
        else:  # offsets
            points_xf = points.astype(self.dtype, copy=True)

        if is_out.any():
            if not affine_fallback:
                points_xf[is_out, :] = np.nan
            else:
                # Apply affine part to out-of-bounds points
                points_xf[is_out, :] = self.affine.xform(
                    points[is_out, :],
                )

        # Interpolate coordinates, re-combine to an x/y/z array and
        # add to the input points (if coordinates, the points are zeroed out above)
        if not is_out.all():
            points_xf[~is_out, :] += np.vstack(
                (
                    xinterp(points_vxl[~is_out, :], method="linear"),
                    yinterp(points_vxl[~is_out, :], method="linear"),
                    zinterp(points_vxl[~is_out, :], method="linear"),
                )
            ).T.astype(self.dtype)

        return points_xf

Return affine part of the transform.

Return shape of the deformation field.

Init class.

Source code in navis/transforms/grid.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(
    self, field: np.ndarray, type: str = "offsets", spacing=None, offset=None
):
    """Init class."""
    assert (
        field.ndim == 4 and field.shape[-1] == 3
    ), "Field must be a 4D numpy array with the last dimension of size 3."
    assert type in (
        "coordinates",
        "offsets",
    ), "type must be 'coordinates' or 'offsets'."
    self.field = field
    self.type = type
    self.dtype = field.dtype
    self.spacing = spacing
    self.offset = offset

Calculate affine part of the transform.

Source code in navis/transforms/grid.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def calculate_affine(self) -> None:
    """Calculate affine part of the transform."""
    # The strategy here is this:
    # 1. Take the 8 corners of the deformation field
    # 2. Transform them using the deformation field
    # 3. Treat them as landmarks and compute the affine transform using morphops

    mx = np.array(self.shape) - 1  # max indices in each dimension
    points = np.array(
        [
            [0, 0, 0],
            [0, 0, mx[2]],
            [0, mx[1], 0],
            [0, mx[1], mx[2]],
            [mx[0], 0, 0],
            [mx[0], 0, mx[2]],
            [mx[0], mx[1], 0],
            [mx[0], mx[1], mx[2]],
        ]
    )

    if self.offset is not None:
        points = points - self.offset[np.newaxis, :]

    if self.spacing is not None:
        points = points * self.spacing[np.newaxis, :]

    points_xf = self.xform(points, affine_fallback=False)

    m = TPStransform(points, points_xf).matrix_rigid

    # Calculate the affine part as the mean displacement across the field
    self._affine = AffineTransform(m)

Return copy.

Source code in navis/transforms/grid.py
166
167
168
169
170
171
def copy(self, copy_data: bool = False) -> "GridTransform":
    """Return copy."""
    if copy_data:
        return GridTransform(self.field.copy(), type=self.type)
    else:
        return GridTransform(self.field, type=self.type)

Create GridTransform a file.

PARAMETER DESCRIPTION
file
        Path to file. Currently supported formats:
          - NRRD files with deformation fields
          - Numpy .npy files with deformation fields
          - Nifti files (.nii, .nii.gz) with deformation fields

TYPE: str

RETURNS DESCRIPTION
GridTransform instance.
Source code in navis/transforms/grid.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
@classmethod
def from_file(cls, filepath: str) -> "GridTransform":
    """Create GridTransform a file.

    Parameters
    ----------
    file :          str
                    Path to file. Currently supported formats:
                      - NRRD files with deformation fields
                      - Numpy .npy files with deformation fields
                      - Nifti files (.nii, .nii.gz) with deformation fields

    Returns
    -------
    GridTransform instance.

    """
    filepath = Path(filepath)
    if not filepath.exists():
        raise FileNotFoundError(f"File {filepath} does not exist.")

    if filepath.suffix in [".npy"]:
        field = np.load(filepath)
    elif filepath.suffix in [".nii", ".nii.gz"]:
        try:
            import nibabel as nib
        except ModuleNotFoundError:
            raise ImportError(
                "`nibabel` package is required to read Nifti files (.nii, .nii.gz)"
            )

        img = nib.load(str(filepath))
        field = img.get_fdata()
    elif filepath.suffix in [".nrrd"]:
        import nrrd

        field, _ = nrrd.read(str(filepath))
    else:
        raise ValueError(
            f"Unsupported file format: {filepath.suffix}. "
            "Supported formats are: .npy, .nii, .nii.gz, .nrrd"
        )

    return cls(field)

Create GridTransform from a Warpfield deformation field.

PARAMETER DESCRIPTION
warpfield
        Warpfield WarpMap instance or path to a WarpMap h5 file.

TYPE: warpfield.WarpMap | str

Source code in navis/transforms/grid.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
@classmethod
def from_warpfield(cls, warpfield):
    """Create GridTransform from a Warpfield deformation field.

    Parameters
    ----------
    warpfield :     warpfield.WarpMap | str
                    Warpfield WarpMap instance or path to a WarpMap h5 file.

    """
    if isinstance(warpfield, str):
        import h5py

        with h5py.File(warpfield, "r") as h5:
            wm = h5["warp_map"]
            field = wm["warp_field"][:]
            block_size = wm["block_size"][:]
            block_stride = wm["block_stride"][:]
            # mov_shape = wm["moving_shape"][:]
            # ref_shape = wm["ref_shape"][:]
    else:
        field = warpfield.warp_field
        block_size = warpfield.block_size
        block_stride = warpfield.block_stride
        # mov_shape = warpfield.mov_shape
        # ref_shape = warpfield.ref_shape

    # Reshape the field from (3, X, Y, Z) to (X, Y, Z, 3)
    field = np.moveaxis(field, 0, -1)

    spacing = block_stride
    offset = -block_size / block_stride / 2
    # Note to self regarding the offset: this code is taken straight from warpfield.

    return cls(field, type="offsets", spacing=spacing, offset=offset)

Xform data.

PARAMETER DESCRIPTION
points
            Points to xform. DataFrame must have x/y/z columns.

TYPE: (N, 3) numpy array | pandas.DataFrame

affine_fallback
            If True, points that are outside the deformation field
            will be transformed using an affine transform defined by
            the affine part of the deformation field.
            If False, points outside the field will be set to np.nan.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
pointsxf

Transformed points. Will contain np.nan for points that did not transform.

TYPE: (N, 3) numpy.ndarray

Source code in navis/transforms/grid.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def xform(
    self,
    points: np.ndarray,
    affine_fallback: bool = False,
) -> np.ndarray:
    """Xform data.

    Parameters
    ----------
    points :            (N, 3) numpy array | pandas.DataFrame
                        Points to xform. DataFrame must have x/y/z columns.
    affine_fallback :   bool
                        If True, points that are outside the deformation field
                        will be transformed using an affine transform defined by
                        the affine part of the deformation field.
                        If False, points outside the field will be set to np.nan.

    Returns
    -------
    pointsxf :          (N, 3) numpy.ndarray
                        Transformed points. Will contain `np.nan` for points
                        that did not transform.

    """
    if isinstance(points, pd.DataFrame):
        # Make sure x/y/z columns are present
        if np.any([c not in points for c in ["x", "y", "z"]]):
            raise ValueError("points DataFrame must have x/y/z columns.")
        points = points[["x", "y", "z"]].values

    if (
        not isinstance(points, np.ndarray)
        or points.ndim != 2
        or points.shape[1] != 3
    ):
        raise TypeError(
            "`points` must be numpy array of shape (N, 3) or "
            "pandas DataFrame with x/y/z columns"
        )

    points_vxl = points

    if self.spacing is not None:
        points_vxl = points_vxl / self.spacing[np.newaxis, :]

    if self.offset is not None:
        points_vxl = points_vxl + self.offset[np.newaxis, :]

    # For interpolation, we need to split the offsets into their x, y
    # and z component
    xgrid = self.field[:, :, :, 0]
    ygrid = self.field[:, :, :, 1]
    zgrid = self.field[:, :, :, 2]

    # Prepare points for interpolation
    xx = np.arange(0, self.shape[0])
    yy = np.arange(0, self.shape[1])
    zz = np.arange(0, self.shape[2])

    # The RegularGridInterpolator is the fastest but the results are
    # are ever so slightly (4th decimal) different from the Java implementation
    xinterp = RegularGridInterpolator(
        (xx, yy, zz), xgrid, bounds_error=False, fill_value=0
    )
    yinterp = RegularGridInterpolator(
        (xx, yy, zz), ygrid, bounds_error=False, fill_value=0
    )
    zinterp = RegularGridInterpolator(
        (xx, yy, zz), zgrid, bounds_error=False, fill_value=0
    )

    # Before we interpolate check how many points are outside the deformation field
    is_out = (points_vxl.min(axis=1) < 0) | np.any(points_vxl >= self.shape[:-1], axis=1)

    # Prepare output array
    if self.type == "coordinates":
        points_xf = np.zeros(points.shape, dtype=self.dtype)
    else:  # offsets
        points_xf = points.astype(self.dtype, copy=True)

    if is_out.any():
        if not affine_fallback:
            points_xf[is_out, :] = np.nan
        else:
            # Apply affine part to out-of-bounds points
            points_xf[is_out, :] = self.affine.xform(
                points[is_out, :],
            )

    # Interpolate coordinates, re-combine to an x/y/z array and
    # add to the input points (if coordinates, the points are zeroed out above)
    if not is_out.all():
        points_xf[~is_out, :] += np.vstack(
            (
                xinterp(points_vxl[~is_out, :], method="linear"),
                yinterp(points_vxl[~is_out, :], method="linear"),
                zinterp(points_vxl[~is_out, :], method="linear"),
            )
        ).T.astype(self.dtype)

    return points_xf

Hdf5 transform of 3D spatial data.

See here for specifications of the format.

PARAMETER DESCRIPTION
f
        Path to Hdf5 transformation.

TYPE: str

direction
        Direction of transformation.

TYPE: "forward" | "inverse" DEFAULT: 'forward'

level
        For Hdf5 files with deformation fields at multiple
        resolutions: what level of detail to use. Negative values
        go backwards from the highest available resolution
        (-1 = highest, -2 = second highest, etc). Ignored if only a
        single deformation field present.

TYPE: int DEFAULT: -1

cache
        If True, we will cache the deformation field for subsequent
        future transforms. This will speed up future calculations
        in the future but comes at a memory cost.

TYPE: bool DEFAULT: False

full_ingest
        If True, will read and cache the full deformation field at
        initialization. This additional upfront cost can pay off if
        you are about to make many transforms across the volume.

TYPE: bool DEFAULT: False

Source code in navis/transforms/h5reg.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
class H5transform(BaseTransform):
    """Hdf5 transform of 3D spatial data.

    See [here](https://github.com/saalfeldlab/template-building/wiki/Hdf5-Deformation-fields)
    for specifications of the format.

    Parameters
    ----------
    f :             str
                    Path to Hdf5 transformation.
    direction :     "forward" | "inverse"
                    Direction of transformation.
    level :         int, optional
                    For Hdf5 files with deformation fields at multiple
                    resolutions: what level of detail to use. Negative values
                    go backwards from the highest available resolution
                    (-1 = highest, -2 = second highest, etc). Ignored if only a
                    single deformation field present.
    cache :         bool
                    If True, we will cache the deformation field for subsequent
                    future transforms. This will speed up future calculations
                    in the future but comes at a memory cost.
    full_ingest :   bool
                    If True, will read and cache the full deformation field at
                    initialization. This additional upfront cost can pay off if
                    you are about to make many transforms across the volume.

    """

    def __init__(
        self,
        f: str,
        direction: str = "forward",
        level: Optional[int] = -1,
        cache: bool = False,
        full_ingest: bool = False,
    ):
        """Init class."""
        assert direction in ("forward", "inverse"), (
            '`direction` must be "forward"' f'or "inverse", not "{direction}"'
        )

        self.file = f
        self.direction = direction
        self.field = {"forward": "dfield", "inverse": "invdfield"}[direction]

        # Trying to avoid the file repeatedly so we are making these initial
        # adjustments all in one go even though it would be more Pythonic to
        # delegate to property getter/setter methods
        with h5py.File(self.file, "r") as h5:
            # Get the available levels
            available_levels = []
            for k in h5.keys():
                try:
                    available_levels.append(int(k))
                except ValueError:
                    continue
            available_levels = sorted(available_levels)

            # Check if there are indeed deformation fields at various resolutions
            if available_levels:
                if isinstance(level, type(None)):
                    level = available_levels[0]
                elif level < 0:
                    ix = level * -1 - 1
                    level = available_levels[ix]

                # Set level
                self._level = str(level)

                # Shape of deformation field
                self._shape = h5[self.level][self.field].shape

                # Data type of deformation field
                self.dtype = h5[self.level][self.field].dtype
            elif self.field in h5.keys():
                # Set level
                self._level = None

                # Shape of deformation field
                self._shape = h5[self.field].shape

                # Data type of deformation field
                self.dtype = h5[self.field].dtype
            else:
                raise ValueError(
                    "Unable to parse deformation fields from " f" {self.file}."
                )

        # Prepare cache if applicable
        if full_ingest:
            # Ingest the whole deformation field
            self.full_ingest()
        elif cache:
            self.use_cache = True

    def __eq__(self, other) -> bool:
        """Compare with other Transform."""
        if isinstance(other, H5transform):
            if self.file == other.file:
                if self.direction == other.direction:
                    if self.level == other.level:
                        return True
        return False

    def __neg__(self) -> "H5transform":
        """Invert direction."""
        # Swap direction
        new_direction = {"forward": "inverse", "inverse": "forward"}[self.direction]
        # We will re-iniatialize
        x = H5transform(
            self.file,
            direction=new_direction,
            level=int(self.level) if self.level else None,
            cache=self.use_cache,
            full_ingest=False,
        )

        return x

    @property
    def level(self):
        return self._level

    @level.setter
    def level(self, value):
        raise ValueError("`level` cannot be changed after initialization.")

    @property
    def shape(self):
        """Shape of the deformation field.

        Note that the deformation field is likely to be (z, y, x, 3) where
        the last dimension contains the x/y/z offsets.
        """
        return self._shape

    @shape.setter
    def shape(self, value):
        raise ValueError("`shape` cannot be changed after initialization.")

    @property
    def spacing(self):
        """Voxel spacing of the deformation field (z, y, x)."""
        if not hasattr(self, "_spacing"):
            with h5py.File(self.file, "r") as h5:
                if self.level:
                    self._spacing = h5[self.level][self.field].attrs["spacing"]
                else:
                    self._spacing = h5[self.field].attrs["spacing"]
        return self._spacing

    @property
    def quantization_multiplier(self):
        """Quantization multiplier of the deformation field."""
        if not hasattr(self, "_quantization_multiplier"):
            with h5py.File(self.file, "r") as h5:
                if self.level:
                    self._quantization_multiplier = h5[self.level][
                        self.field
                    ].attrs.get("quantization_multiplier", 1)
                else:
                    self._quantization_multiplier = h5[self.field].attrs.get(
                        "quantization_multiplier", 1
                    )
        return self._quantization_multiplier

    @property
    def use_cache(self):
        """Whether to cache the deformation field."""
        if not hasattr(self, "_use_cache"):
            self._use_cache = False
        return self._use_cache

    @use_cache.setter
    def use_cache(self, value):
        """Set whether to cache the deformation field."""
        assert isinstance(value, bool)

        # If was False and now set to True, build the cache
        if not getattr(self, "_use_cache", False) and value:
            # This is the actual cache
            self.cache = np.zeros(self.shape, dtype=self.dtype)
            # This is a mask that tells us which values have already been cached
            self.cached = np.zeros(self.shape[:-1], dtype=bool)
            self._use_cache = True
        # If was True and now is set to False, deconstruct cache
        elif getattr(self, "_use_cache", False) and not value:
            del self.cache
            del self.cached
            self._use_cache = False

            if hasattr(self, "_fully_ingested"):
                del self._fully_ingested

        # Note: should explore whether we can use sparse N-dimensional
        # arrays for caching to save memory
        # See https://github.com/pydata/sparse/

    def copy(self):
        """Return copy."""
        return H5transform(
            self.file,
            direction=self.direction,
            level=int(self.level) if self.level else None,
            cache=self.use_cache,
            full_ingest=False,
        )

    def full_ingest(self):
        """Fully ingest the deformation field."""
        # Skip if already ingested
        if getattr(self, "_fully_ingested", False):
            return

        with h5py.File(self.file, "r") as h5:
            # Read in the entire field
            if self.level:
                self.cache = h5[self.level][self.field][:, :, :]
            else:
                self.cache = h5[self.field][:, :, :]
            # Keep a flag of this
            self._fully_ingested = True
            # We set `cached` to True instead of using a mask
            self.cached = True
            # Keep track of the caching
            self._use_cache = True

    def precache(self, bbox: Union[list, np.ndarray], padding=True):
        """Cache deformation field for given bounding box.

        Parameters
        ----------
        bbox :      list | array
                    Must be `[[x1, x2], [y1, y2], [z1, z2]]`.
        padding :   bool
                    If True, will add the (required!) padding to the bounding
                    box.

        """
        bbox = np.asarray(bbox)

        if bbox.ndim != 2 or bbox.shape != (3, 2):
            raise ValueError(f"Expected (3, 2) bounding box, got {bbox.shape}")

        # Set use_cache=True -> this also prepares the cache array(s)
        self.use_cache = True

        with h5py.File(self.file, "r") as h5:
            if self.level:
                spacing = h5[self.level][self.field].attrs["spacing"]
            else:
                spacing = h5[self.field].attrs["spacing"]

            # Note that we invert because spacing is given in (z, y, x)
            bbox_vxl = (bbox.T / spacing[::-1]).T
            # Digitize into voxels
            bbox_vxl = bbox_vxl.round().astype(int)

            if padding:
                bbox_vxl[:, 0] -= 2
                bbox_vxl[:, 1] += 2

            # Make sure we are within bounds
            bbox_vxl = np.clip(bbox_vxl.T, 0, self.shape[:-1][::-1]).T

            # Extract values
            x1, x2, y1, y2, z1, z2 = bbox_vxl.flatten()

            # Cache values in this bounding box
            if self.level:
                read_from = h5[self.level][self.field]
            else:
                read_from = h5[self.field]

            self.cache[z1:z2, y1:y2, x1:x2] = read_from[z1:z2, y1:y2, x1:x2]
            self.cached[z1:z2, y1:y2, x1:x2] = True

    @staticmethod
    def from_file(filepath: str, **kwargs) -> "H5transform":
        """Generate H5transform from file.

        Parameters
        ----------
        filepath :  str
                    Path to H5 transform.
        **kwargs
                    Keyword arguments passed to H5transform.__init__

        Returns
        -------
        H5transform

        """
        return H5transform(str(filepath), **kwargs)

    def xform(
        self,
        points: np.ndarray,
        affine_fallback: bool = True,
        force_deform: bool = True,
    ) -> np.ndarray:
        """Xform data.

        Parameters
        ----------
        points :            (N, 3) numpy array | pandas.DataFrame
                            Points to xform. DataFrame must have x/y/z columns.
        affine_fallback :   bool
                            If False, points outside the deformation field will
                            be returned as `np.nan`. If True, these points will
                            only receive the affine part of the transformation.
        force_deform :      bools
                            If True, points outside the deformation field be
                            deformed using the closest point inside the
                            deformation field. Ignored if `affine_fallback` is
                            `False`.

        Returns
        -------
        pointsxf :      (N, 3) numpy array
                        Transformed points. Points outside the deformation field
                        will have only the affine part of the transform applied.

        """
        if isinstance(points, pd.DataFrame):
            # Make sure x/y/z columns are present
            if np.any([c not in points for c in ["x", "y", "z"]]):
                raise ValueError("points DataFrame must have x/y/z columns.")
            points = points[["x", "y", "z"]].values

        if (
            not isinstance(points, np.ndarray)
            or points.ndim != 2
            or points.shape[1] != 3
        ):
            raise TypeError(
                "`points` must be numpy array of shape (N, 3) or "
                "pandas DataFrame with x/y/z columns"
            )

        # Read the file
        with h5py.File(self.file, "r") as h5:
            if self.level:
                field = h5[self.level][self.field]
            else:
                field = h5[self.field]

            # We need the field to be (z, y, x, offsets) with `offsets` being
            # three values - for example: (293, 470, 1010, 3)
            # If that's not the case, something is fishy!
            if field.shape[-1] != 3:
                logger.warning(
                    "Expected the deformation field to be of shape "
                    f"(z, y, x, 3), got {field.shape}."
                )

            if "affine" in field.attrs:
                # The affine part of the transform is a 4 x 4 matrix where the upper
                # 3 x 4 part (row x columns) is an attribute of the h5 dataset
                M = np.ones((4, 4))
                M[:3, :4] = field.attrs["affine"].reshape(3, 4)
                affine = AffineTransform(M)
            else:
                affine = False

            # Get quantization multiplier for later use
            quantization_multiplier = field.attrs.get("quantization_multiplier", 1)

            # For inverse direction, the affine part is applied first
            if self.direction == "inverse" and affine:
                xf = affine.xform(points)
            else:
                xf = points

            # Translate points into voxel space
            spacing = field.attrs["spacing"]
            # Note that we invert because spacing is given in (z, y, x)
            xf_voxel = xf / spacing[::-1]
            # Digitize points into voxels
            xf_indices = xf_voxel.round().astype(int)
            # Determine the bounding box of the deformation vectors we need
            # Note that we are grabbing a bit more than required - this is
            # necessary for interpolation later down the line
            mn = xf_indices.min(axis=0) - 2
            mx = xf_indices.max(axis=0) + 2

            # Make sure we are within bounds
            # Note that we clip `mn` at 0 and `mx` at 2 at the lower end?
            # This is to make sure we have enough of the deformation field
            # to interpolate later on `offsets`
            mn = np.clip(mn, 2, np.array(self.shape[:-1][::-1])) - 2
            mx = np.clip(mx, 0, np.array(self.shape[:-1][::-1]) - 2) + 2

            # Check if we can use cached values
            if self.use_cache and (
                hasattr(self, "_fully_ingested")
                or np.all(self.cached[mn[2] : mx[2], mn[1] : mx[1], mn[0] : mx[0]])
            ):
                offsets = self.cache[mn[2] : mx[2], mn[1] : mx[1], mn[0] : mx[0]]
            else:
                # Load the deformation values for this bounding box
                # This is faster than grabbing individual voxels and
                offsets = field[mn[2] : mx[2], mn[1] : mx[1], mn[0] : mx[0]]

                if self.use_cache:
                    # Write these offsets to cache
                    self.cache[mn[2] : mx[2], mn[1] : mx[1], mn[0] : mx[0]] = offsets
                    self.cached[mn[2] : mx[2], mn[1] : mx[1], mn[0] : mx[0]] = True

        # For interpolation, we need to split the offsets into their x, y
        # and z component
        xgrid = offsets[:, :, :, 0]
        ygrid = offsets[:, :, :, 1]
        zgrid = offsets[:, :, :, 2]

        xx = np.arange(mn[0], mx[0])
        yy = np.arange(mn[1], mx[1])
        zz = np.arange(mn[2], mx[2])

        # The RegularGridInterpolator is the fastest one but the results are
        # are ever so slightly (4th decimal) different from the Java implementation
        xinterp = RegularGridInterpolator(
            (zz, yy, xx), xgrid, bounds_error=False, fill_value=0
        )
        yinterp = RegularGridInterpolator(
            (zz, yy, xx), ygrid, bounds_error=False, fill_value=0
        )
        zinterp = RegularGridInterpolator(
            (zz, yy, xx), zgrid, bounds_error=False, fill_value=0
        )

        # Before we interpolate check how many points are outside the
        # deformation field -> these will only receive the affine part of the
        # transform
        is_out = (xf_voxel.min(axis=1) < 0) | np.any(
            xf_voxel >= self.shape[:-1][::-1], axis=1
        )

        # If more than 20% (arbitrary number) of voxels are out, there is
        # something suspicious going on
        frac_out = is_out.sum() / xf_voxel.shape[0]
        if frac_out > 0.2 and not getattr(self, "_silenced_large_out_warning", False):
            logger.warning(
                f"A suspiciously large fraction ({frac_out:.1%}) "
                f"of {xf_voxel.shape[0]} points appear to be outside "
                "the H5 deformation field. Please make doubly sure "
                "that the input coordinates are in the correct "
                "space/units"
            )
        # If all points are outside the volume, the interpolation complains
        if frac_out < 1 or (force_deform and affine_fallback):
            if force_deform:
                # For the purpose of finding offsets, we will snap points
                # outside the deformation field to the closest inside voxel
                q_voxel = np.clip(
                    xf_voxel, a_min=0, a_max=np.array(self.shape[:-1][::-1]) - 1
                )
            else:
                q_voxel = xf_voxel

            # Interpolate coordinates and re-combine to an x/y/z array
            offset_vxl = np.vstack(
                (
                    xinterp(q_voxel[:, ::-1], method="linear"),
                    yinterp(q_voxel[:, ::-1], method="linear"),
                    zinterp(q_voxel[:, ::-1], method="linear"),
                )
            ).T

            # Turn offsets into real-world coordinates
            offset_real = offset_vxl * quantization_multiplier

            # Apply offsets
            # Please note that we must not use += here
            # That's to avoid running into data type errors where numpy
            # will refuse to add e.g. float64 to int64.
            # By using "+" instead of "+=" we are creating a new array that
            # is potentially upcast from e.g. int64 to float64
            xf = xf + offset_real

        # For forward direction, the affine part is applied last
        if self.direction == "forward" and affine:
            xf = affine.xform(xf)

        # If no affine_fallback, set outside points to np.nan
        if not affine_fallback:
            xf[is_out, :] = np.nan

        return xf

    def xform_image(
        self,
        image: np.ndarray,
        image_res: Union[tuple, list],
        out_res: Optional[Union[tuple, list]] = None,
        out_shape: Optional[Union[tuple, list]] = None,
        order: int = 1,
        mode: str = "constant",
        cval: float = 0,
        cache: bool = True,
        progress: bool = True,
    ) -> np.ndarray:
        """Transform a 3D image using backward mapping.

        If available, this method will use `numba` to accelerate the
        transformation. This is highly recommended as it can speed up
        the transformation by several orders of magnitude:

          pip install numba

        Note though that the numba-accelerated path only supports linear
        interpolation and constant-mode boundary handling (default).

        Parameters
        ----------
        image :         (N, M, K) numpy array
                        Image to be transformed.
        image_res :     (3, ) tuple | list
                        Voxel resolution of the input image.
        out_res :       (3, ) tuple | list, optional
                        Voxel resolution of the output image. If None, assumed
                        to be the same as `image_res`.
        out_shape :     (3, ) tuple | list, optional
                        Shape of the output image in voxels. If None, uses the
                        shape of the deformation field. This works as long as
                        the deformation field fully samples the target space.
        order :         int
                        The order of the spline interpolation, default is 1
                        (linear). The order has to be in the range 0-5.
        mode :          str
                        How to handle values outside the image bounds.
                        Default is 'constant' (pad with cval).
        cval :          float
                        Value used for points outside the boundaries when
                        mode='constant'.
        chunk_size :    int
                        Size of chunks to process along each dimension.
                        Larger chunks are faster but use more memory.
                        Only relevant for Python (not numba) path.
        cache :         bool
                        If True, we will cache the deformation field for
                        subsequent future transforms. This is generally
                        recommended unless memory is very tight or the
                        deformation field is huge.
        progress :      bool
                        Whether to show a progress bar during processing.
                        Not available (and probably not necessary) if
                        numba-accelerated path is used.

        Returns
        -------
        transformed :   (N, M, K) numpy array
                        Transformed image in target space. The shape is
                        determined by `out_shape` and `out_res`.

        Notes
        -----
        This method uses backward mapping: for each output chunk, we compute
        where the voxels came from in the source image using the transformation,
        then interpolate the source image at those locations. So if your
        transform goes from A -> B, this method actually uses the inverse transform
        under the hood to figure out where in A the voxels in B come from.

        """
        if not isinstance(image, np.ndarray) or image.ndim != 3:
            raise TypeError("`image` must be a 3D numpy array")

        # Because we are using backward mapping, we need to invert the transform,
        # i.e. we're taking points in the target space, mapping them back to the
        # source space and sampling the source image at those locations)
        if hasattr(self, "_inverted_copy"):
            # We're tracking the copy so that we can use the cache for subsequent
            # transformations instead of having to re-read the file and re-ingest
            # the deformation field
            reg_inv = self._inverted_copy
        else:
            reg_inv = -self
            # If cache, track the inverted copy (and the deformation field)
            if cache:
                reg_inv.use_cache = True
                self._inverted_copy = reg_inv

        if out_res is None:
            out_res = image_res

        if out_shape is None:
            # Bounds in physical space
            bounds = np.array(list(reg_inv.shape)[:-1][::-1]) * reg_inv.spacing
            # Back to voxel space in output resolution
            out_shape = np.ceil(bounds / out_res).astype(int)

        # Generate the empty output array
        out = np.zeros(out_shape, dtype=image.dtype)

        try:
            from .h5reg_numba import h5reg_warp_image_linear_constant

            nb_available = True
        except ModuleNotFoundError:
            nb_available = False

        # Numba path: linear interpolation + constant mode only but much faster
        if order == 1 and mode == "constant":
            if not nb_available:
                global NUMBA_WARNING
                if not NUMBA_WARNING:
                    logger.warning(
                        "For faster transforming of images, please install numba:\n"
                        "  pip install numba"
                    )
                    NUMBA_WARNING = True
            else:
                reg_inv.full_ingest()
                field = reg_inv.cache

                with h5py.File(reg_inv.file, "r") as h5:
                    if reg_inv.level:
                        ds = h5[reg_inv.level][reg_inv.field]
                    else:
                        ds = h5[reg_inv.field]

                    spacing = np.asarray(ds.attrs["spacing"], dtype=np.float64)
                    qmult = float(ds.attrs.get("quantization_multiplier", 1))

                    if "affine" in ds.attrs:
                        affine = np.ones((4, 4), dtype=np.float64)
                        affine[:3, :4] = ds.attrs["affine"].reshape(3, 4)
                        apply_affine = 1 if reg_inv.direction == "inverse" else 2
                    else:
                        affine = np.eye(4, dtype=np.float64)
                        apply_affine = 0

                return h5reg_warp_image_linear_constant(
                    image,
                    field,
                    spacing,
                    qmult,
                    affine,
                    apply_affine,
                    out,
                    out_res,
                    image_res,
                    float(cval),
                )

        # Pre-compute prefilter if needed for higher order interpolation
        if order > 1:
            from scipy.ndimage import spline_filter

            image = spline_filter(image, order=order)

        for z in config.trange(
            out_shape[2],
            disable=not progress or config.pbar_hide,
            desc="Warping image",
        ):
            # Generate a grid of points in target voxel space for this z-slice
            chunk_coords = np.mgrid[
                0 : out_shape[0] : 1, 0 : out_shape[1] : 1, z : z + 1
            ]
            points = np.stack(
                [
                    chunk_coords[0].flatten(),
                    chunk_coords[1].flatten(),
                    chunk_coords[2].flatten(),
                ],
                axis=-1,
            )

            # Convert to physical space
            points = points.astype(np.float32) * out_res

            # Transform points to source space using the inverse transform
            try:
                reg_inv._silenced_large_out_warning = True
                points_xf = reg_inv.xform(points)
            except Exception as e:
                reg_inv._silenced_large_out_warning = False
                raise e

            # Convert back to voxel space in source image
            points_xf_vox = points_xf / image_res

            # Sample the source image at these transformed coordinates
            sampled_values = map_coordinates(
                image,
                [points_xf_vox[:, 0], points_xf_vox[:, 1], points_xf_vox[:, 2]],
                order=1,
                mode="constant",
                cval=0,
            )

            # Reshape back to 2D and assign to this slice of the output image
            out[:, :, z] = sampled_values.reshape(out_shape[0], out_shape[1])

        return out

Quantization multiplier of the deformation field.

Shape of the deformation field.

Note that the deformation field is likely to be (z, y, x, 3) where the last dimension contains the x/y/z offsets.

Voxel spacing of the deformation field (z, y, x).

Whether to cache the deformation field.

Init class.

Source code in navis/transforms/h5reg.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __init__(
    self,
    f: str,
    direction: str = "forward",
    level: Optional[int] = -1,
    cache: bool = False,
    full_ingest: bool = False,
):
    """Init class."""
    assert direction in ("forward", "inverse"), (
        '`direction` must be "forward"' f'or "inverse", not "{direction}"'
    )

    self.file = f
    self.direction = direction
    self.field = {"forward": "dfield", "inverse": "invdfield"}[direction]

    # Trying to avoid the file repeatedly so we are making these initial
    # adjustments all in one go even though it would be more Pythonic to
    # delegate to property getter/setter methods
    with h5py.File(self.file, "r") as h5:
        # Get the available levels
        available_levels = []
        for k in h5.keys():
            try:
                available_levels.append(int(k))
            except ValueError:
                continue
        available_levels = sorted(available_levels)

        # Check if there are indeed deformation fields at various resolutions
        if available_levels:
            if isinstance(level, type(None)):
                level = available_levels[0]
            elif level < 0:
                ix = level * -1 - 1
                level = available_levels[ix]

            # Set level
            self._level = str(level)

            # Shape of deformation field
            self._shape = h5[self.level][self.field].shape

            # Data type of deformation field
            self.dtype = h5[self.level][self.field].dtype
        elif self.field in h5.keys():
            # Set level
            self._level = None

            # Shape of deformation field
            self._shape = h5[self.field].shape

            # Data type of deformation field
            self.dtype = h5[self.field].dtype
        else:
            raise ValueError(
                "Unable to parse deformation fields from " f" {self.file}."
            )

    # Prepare cache if applicable
    if full_ingest:
        # Ingest the whole deformation field
        self.full_ingest()
    elif cache:
        self.use_cache = True

Return copy.

Source code in navis/transforms/h5reg.py
237
238
239
240
241
242
243
244
245
def copy(self):
    """Return copy."""
    return H5transform(
        self.file,
        direction=self.direction,
        level=int(self.level) if self.level else None,
        cache=self.use_cache,
        full_ingest=False,
    )

Generate H5transform from file.

PARAMETER DESCRIPTION
filepath
    Path to H5 transform.

TYPE: str

**kwargs
    Keyword arguments passed to H5transform.__init__

DEFAULT: {}

RETURNS DESCRIPTION
H5transform
Source code in navis/transforms/h5reg.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
@staticmethod
def from_file(filepath: str, **kwargs) -> "H5transform":
    """Generate H5transform from file.

    Parameters
    ----------
    filepath :  str
                Path to H5 transform.
    **kwargs
                Keyword arguments passed to H5transform.__init__

    Returns
    -------
    H5transform

    """
    return H5transform(str(filepath), **kwargs)

Fully ingest the deformation field.

Source code in navis/transforms/h5reg.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def full_ingest(self):
    """Fully ingest the deformation field."""
    # Skip if already ingested
    if getattr(self, "_fully_ingested", False):
        return

    with h5py.File(self.file, "r") as h5:
        # Read in the entire field
        if self.level:
            self.cache = h5[self.level][self.field][:, :, :]
        else:
            self.cache = h5[self.field][:, :, :]
        # Keep a flag of this
        self._fully_ingested = True
        # We set `cached` to True instead of using a mask
        self.cached = True
        # Keep track of the caching
        self._use_cache = True

Cache deformation field for given bounding box.

PARAMETER DESCRIPTION
bbox
    Must be `[[x1, x2], [y1, y2], [z1, z2]]`.

TYPE: list | array

padding
    If True, will add the (required!) padding to the bounding
    box.

TYPE: bool DEFAULT: True

Source code in navis/transforms/h5reg.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def precache(self, bbox: Union[list, np.ndarray], padding=True):
    """Cache deformation field for given bounding box.

    Parameters
    ----------
    bbox :      list | array
                Must be `[[x1, x2], [y1, y2], [z1, z2]]`.
    padding :   bool
                If True, will add the (required!) padding to the bounding
                box.

    """
    bbox = np.asarray(bbox)

    if bbox.ndim != 2 or bbox.shape != (3, 2):
        raise ValueError(f"Expected (3, 2) bounding box, got {bbox.shape}")

    # Set use_cache=True -> this also prepares the cache array(s)
    self.use_cache = True

    with h5py.File(self.file, "r") as h5:
        if self.level:
            spacing = h5[self.level][self.field].attrs["spacing"]
        else:
            spacing = h5[self.field].attrs["spacing"]

        # Note that we invert because spacing is given in (z, y, x)
        bbox_vxl = (bbox.T / spacing[::-1]).T
        # Digitize into voxels
        bbox_vxl = bbox_vxl.round().astype(int)

        if padding:
            bbox_vxl[:, 0] -= 2
            bbox_vxl[:, 1] += 2

        # Make sure we are within bounds
        bbox_vxl = np.clip(bbox_vxl.T, 0, self.shape[:-1][::-1]).T

        # Extract values
        x1, x2, y1, y2, z1, z2 = bbox_vxl.flatten()

        # Cache values in this bounding box
        if self.level:
            read_from = h5[self.level][self.field]
        else:
            read_from = h5[self.field]

        self.cache[z1:z2, y1:y2, x1:x2] = read_from[z1:z2, y1:y2, x1:x2]
        self.cached[z1:z2, y1:y2, x1:x2] = True

Xform data.

PARAMETER DESCRIPTION
points
            Points to xform. DataFrame must have x/y/z columns.

TYPE: (N, 3) numpy array | pandas.DataFrame

affine_fallback
            If False, points outside the deformation field will
            be returned as `np.nan`. If True, these points will
            only receive the affine part of the transformation.

TYPE: bool DEFAULT: True

force_deform
            If True, points outside the deformation field be
            deformed using the closest point inside the
            deformation field. Ignored if `affine_fallback` is
            `False`.

TYPE: bools DEFAULT: True

RETURNS DESCRIPTION
pointsxf

Transformed points. Points outside the deformation field will have only the affine part of the transform applied.

TYPE: (N, 3) numpy array

Source code in navis/transforms/h5reg.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
def xform(
    self,
    points: np.ndarray,
    affine_fallback: bool = True,
    force_deform: bool = True,
) -> np.ndarray:
    """Xform data.

    Parameters
    ----------
    points :            (N, 3) numpy array | pandas.DataFrame
                        Points to xform. DataFrame must have x/y/z columns.
    affine_fallback :   bool
                        If False, points outside the deformation field will
                        be returned as `np.nan`. If True, these points will
                        only receive the affine part of the transformation.
    force_deform :      bools
                        If True, points outside the deformation field be
                        deformed using the closest point inside the
                        deformation field. Ignored if `affine_fallback` is
                        `False`.

    Returns
    -------
    pointsxf :      (N, 3) numpy array
                    Transformed points. Points outside the deformation field
                    will have only the affine part of the transform applied.

    """
    if isinstance(points, pd.DataFrame):
        # Make sure x/y/z columns are present
        if np.any([c not in points for c in ["x", "y", "z"]]):
            raise ValueError("points DataFrame must have x/y/z columns.")
        points = points[["x", "y", "z"]].values

    if (
        not isinstance(points, np.ndarray)
        or points.ndim != 2
        or points.shape[1] != 3
    ):
        raise TypeError(
            "`points` must be numpy array of shape (N, 3) or "
            "pandas DataFrame with x/y/z columns"
        )

    # Read the file
    with h5py.File(self.file, "r") as h5:
        if self.level:
            field = h5[self.level][self.field]
        else:
            field = h5[self.field]

        # We need the field to be (z, y, x, offsets) with `offsets` being
        # three values - for example: (293, 470, 1010, 3)
        # If that's not the case, something is fishy!
        if field.shape[-1] != 3:
            logger.warning(
                "Expected the deformation field to be of shape "
                f"(z, y, x, 3), got {field.shape}."
            )

        if "affine" in field.attrs:
            # The affine part of the transform is a 4 x 4 matrix where the upper
            # 3 x 4 part (row x columns) is an attribute of the h5 dataset
            M = np.ones((4, 4))
            M[:3, :4] = field.attrs["affine"].reshape(3, 4)
            affine = AffineTransform(M)
        else:
            affine = False

        # Get quantization multiplier for later use
        quantization_multiplier = field.attrs.get("quantization_multiplier", 1)

        # For inverse direction, the affine part is applied first
        if self.direction == "inverse" and affine:
            xf = affine.xform(points)
        else:
            xf = points

        # Translate points into voxel space
        spacing = field.attrs["spacing"]
        # Note that we invert because spacing is given in (z, y, x)
        xf_voxel = xf / spacing[::-1]
        # Digitize points into voxels
        xf_indices = xf_voxel.round().astype(int)
        # Determine the bounding box of the deformation vectors we need
        # Note that we are grabbing a bit more than required - this is
        # necessary for interpolation later down the line
        mn = xf_indices.min(axis=0) - 2
        mx = xf_indices.max(axis=0) + 2

        # Make sure we are within bounds
        # Note that we clip `mn` at 0 and `mx` at 2 at the lower end?
        # This is to make sure we have enough of the deformation field
        # to interpolate later on `offsets`
        mn = np.clip(mn, 2, np.array(self.shape[:-1][::-1])) - 2
        mx = np.clip(mx, 0, np.array(self.shape[:-1][::-1]) - 2) + 2

        # Check if we can use cached values
        if self.use_cache and (
            hasattr(self, "_fully_ingested")
            or np.all(self.cached[mn[2] : mx[2], mn[1] : mx[1], mn[0] : mx[0]])
        ):
            offsets = self.cache[mn[2] : mx[2], mn[1] : mx[1], mn[0] : mx[0]]
        else:
            # Load the deformation values for this bounding box
            # This is faster than grabbing individual voxels and
            offsets = field[mn[2] : mx[2], mn[1] : mx[1], mn[0] : mx[0]]

            if self.use_cache:
                # Write these offsets to cache
                self.cache[mn[2] : mx[2], mn[1] : mx[1], mn[0] : mx[0]] = offsets
                self.cached[mn[2] : mx[2], mn[1] : mx[1], mn[0] : mx[0]] = True

    # For interpolation, we need to split the offsets into their x, y
    # and z component
    xgrid = offsets[:, :, :, 0]
    ygrid = offsets[:, :, :, 1]
    zgrid = offsets[:, :, :, 2]

    xx = np.arange(mn[0], mx[0])
    yy = np.arange(mn[1], mx[1])
    zz = np.arange(mn[2], mx[2])

    # The RegularGridInterpolator is the fastest one but the results are
    # are ever so slightly (4th decimal) different from the Java implementation
    xinterp = RegularGridInterpolator(
        (zz, yy, xx), xgrid, bounds_error=False, fill_value=0
    )
    yinterp = RegularGridInterpolator(
        (zz, yy, xx), ygrid, bounds_error=False, fill_value=0
    )
    zinterp = RegularGridInterpolator(
        (zz, yy, xx), zgrid, bounds_error=False, fill_value=0
    )

    # Before we interpolate check how many points are outside the
    # deformation field -> these will only receive the affine part of the
    # transform
    is_out = (xf_voxel.min(axis=1) < 0) | np.any(
        xf_voxel >= self.shape[:-1][::-1], axis=1
    )

    # If more than 20% (arbitrary number) of voxels are out, there is
    # something suspicious going on
    frac_out = is_out.sum() / xf_voxel.shape[0]
    if frac_out > 0.2 and not getattr(self, "_silenced_large_out_warning", False):
        logger.warning(
            f"A suspiciously large fraction ({frac_out:.1%}) "
            f"of {xf_voxel.shape[0]} points appear to be outside "
            "the H5 deformation field. Please make doubly sure "
            "that the input coordinates are in the correct "
            "space/units"
        )
    # If all points are outside the volume, the interpolation complains
    if frac_out < 1 or (force_deform and affine_fallback):
        if force_deform:
            # For the purpose of finding offsets, we will snap points
            # outside the deformation field to the closest inside voxel
            q_voxel = np.clip(
                xf_voxel, a_min=0, a_max=np.array(self.shape[:-1][::-1]) - 1
            )
        else:
            q_voxel = xf_voxel

        # Interpolate coordinates and re-combine to an x/y/z array
        offset_vxl = np.vstack(
            (
                xinterp(q_voxel[:, ::-1], method="linear"),
                yinterp(q_voxel[:, ::-1], method="linear"),
                zinterp(q_voxel[:, ::-1], method="linear"),
            )
        ).T

        # Turn offsets into real-world coordinates
        offset_real = offset_vxl * quantization_multiplier

        # Apply offsets
        # Please note that we must not use += here
        # That's to avoid running into data type errors where numpy
        # will refuse to add e.g. float64 to int64.
        # By using "+" instead of "+=" we are creating a new array that
        # is potentially upcast from e.g. int64 to float64
        xf = xf + offset_real

    # For forward direction, the affine part is applied last
    if self.direction == "forward" and affine:
        xf = affine.xform(xf)

    # If no affine_fallback, set outside points to np.nan
    if not affine_fallback:
        xf[is_out, :] = np.nan

    return xf

Transform a 3D image using backward mapping.

If available, this method will use numba to accelerate the transformation. This is highly recommended as it can speed up the transformation by several orders of magnitude:

pip install numba

Note though that the numba-accelerated path only supports linear interpolation and constant-mode boundary handling (default).

PARAMETER DESCRIPTION
image
        Image to be transformed.

TYPE: (N, M, K) numpy array

image_res
        Voxel resolution of the input image.

TYPE: (3, ) tuple | list

out_res
        Voxel resolution of the output image. If None, assumed
        to be the same as `image_res`.

TYPE: (3, ) tuple | list DEFAULT: None

out_shape
        Shape of the output image in voxels. If None, uses the
        shape of the deformation field. This works as long as
        the deformation field fully samples the target space.

TYPE: (3, ) tuple | list DEFAULT: None

order
        The order of the spline interpolation, default is 1
        (linear). The order has to be in the range 0-5.

TYPE: int DEFAULT: 1

mode
        How to handle values outside the image bounds.
        Default is 'constant' (pad with cval).

TYPE: str DEFAULT: 'constant'

cval
        Value used for points outside the boundaries when
        mode='constant'.

TYPE: float DEFAULT: 0

chunk_size
        Size of chunks to process along each dimension.
        Larger chunks are faster but use more memory.
        Only relevant for Python (not numba) path.

TYPE: int

cache
        If True, we will cache the deformation field for
        subsequent future transforms. This is generally
        recommended unless memory is very tight or the
        deformation field is huge.

TYPE: bool DEFAULT: True

progress
        Whether to show a progress bar during processing.
        Not available (and probably not necessary) if
        numba-accelerated path is used.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
transformed

Transformed image in target space. The shape is determined by out_shape and out_res.

TYPE: (N, M, K) numpy array

Notes

This method uses backward mapping: for each output chunk, we compute where the voxels came from in the source image using the transformation, then interpolate the source image at those locations. So if your transform goes from A -> B, this method actually uses the inverse transform under the hood to figure out where in A the voxels in B come from.

Source code in navis/transforms/h5reg.py
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
def xform_image(
    self,
    image: np.ndarray,
    image_res: Union[tuple, list],
    out_res: Optional[Union[tuple, list]] = None,
    out_shape: Optional[Union[tuple, list]] = None,
    order: int = 1,
    mode: str = "constant",
    cval: float = 0,
    cache: bool = True,
    progress: bool = True,
) -> np.ndarray:
    """Transform a 3D image using backward mapping.

    If available, this method will use `numba` to accelerate the
    transformation. This is highly recommended as it can speed up
    the transformation by several orders of magnitude:

      pip install numba

    Note though that the numba-accelerated path only supports linear
    interpolation and constant-mode boundary handling (default).

    Parameters
    ----------
    image :         (N, M, K) numpy array
                    Image to be transformed.
    image_res :     (3, ) tuple | list
                    Voxel resolution of the input image.
    out_res :       (3, ) tuple | list, optional
                    Voxel resolution of the output image. If None, assumed
                    to be the same as `image_res`.
    out_shape :     (3, ) tuple | list, optional
                    Shape of the output image in voxels. If None, uses the
                    shape of the deformation field. This works as long as
                    the deformation field fully samples the target space.
    order :         int
                    The order of the spline interpolation, default is 1
                    (linear). The order has to be in the range 0-5.
    mode :          str
                    How to handle values outside the image bounds.
                    Default is 'constant' (pad with cval).
    cval :          float
                    Value used for points outside the boundaries when
                    mode='constant'.
    chunk_size :    int
                    Size of chunks to process along each dimension.
                    Larger chunks are faster but use more memory.
                    Only relevant for Python (not numba) path.
    cache :         bool
                    If True, we will cache the deformation field for
                    subsequent future transforms. This is generally
                    recommended unless memory is very tight or the
                    deformation field is huge.
    progress :      bool
                    Whether to show a progress bar during processing.
                    Not available (and probably not necessary) if
                    numba-accelerated path is used.

    Returns
    -------
    transformed :   (N, M, K) numpy array
                    Transformed image in target space. The shape is
                    determined by `out_shape` and `out_res`.

    Notes
    -----
    This method uses backward mapping: for each output chunk, we compute
    where the voxels came from in the source image using the transformation,
    then interpolate the source image at those locations. So if your
    transform goes from A -> B, this method actually uses the inverse transform
    under the hood to figure out where in A the voxels in B come from.

    """
    if not isinstance(image, np.ndarray) or image.ndim != 3:
        raise TypeError("`image` must be a 3D numpy array")

    # Because we are using backward mapping, we need to invert the transform,
    # i.e. we're taking points in the target space, mapping them back to the
    # source space and sampling the source image at those locations)
    if hasattr(self, "_inverted_copy"):
        # We're tracking the copy so that we can use the cache for subsequent
        # transformations instead of having to re-read the file and re-ingest
        # the deformation field
        reg_inv = self._inverted_copy
    else:
        reg_inv = -self
        # If cache, track the inverted copy (and the deformation field)
        if cache:
            reg_inv.use_cache = True
            self._inverted_copy = reg_inv

    if out_res is None:
        out_res = image_res

    if out_shape is None:
        # Bounds in physical space
        bounds = np.array(list(reg_inv.shape)[:-1][::-1]) * reg_inv.spacing
        # Back to voxel space in output resolution
        out_shape = np.ceil(bounds / out_res).astype(int)

    # Generate the empty output array
    out = np.zeros(out_shape, dtype=image.dtype)

    try:
        from .h5reg_numba import h5reg_warp_image_linear_constant

        nb_available = True
    except ModuleNotFoundError:
        nb_available = False

    # Numba path: linear interpolation + constant mode only but much faster
    if order == 1 and mode == "constant":
        if not nb_available:
            global NUMBA_WARNING
            if not NUMBA_WARNING:
                logger.warning(
                    "For faster transforming of images, please install numba:\n"
                    "  pip install numba"
                )
                NUMBA_WARNING = True
        else:
            reg_inv.full_ingest()
            field = reg_inv.cache

            with h5py.File(reg_inv.file, "r") as h5:
                if reg_inv.level:
                    ds = h5[reg_inv.level][reg_inv.field]
                else:
                    ds = h5[reg_inv.field]

                spacing = np.asarray(ds.attrs["spacing"], dtype=np.float64)
                qmult = float(ds.attrs.get("quantization_multiplier", 1))

                if "affine" in ds.attrs:
                    affine = np.ones((4, 4), dtype=np.float64)
                    affine[:3, :4] = ds.attrs["affine"].reshape(3, 4)
                    apply_affine = 1 if reg_inv.direction == "inverse" else 2
                else:
                    affine = np.eye(4, dtype=np.float64)
                    apply_affine = 0

            return h5reg_warp_image_linear_constant(
                image,
                field,
                spacing,
                qmult,
                affine,
                apply_affine,
                out,
                out_res,
                image_res,
                float(cval),
            )

    # Pre-compute prefilter if needed for higher order interpolation
    if order > 1:
        from scipy.ndimage import spline_filter

        image = spline_filter(image, order=order)

    for z in config.trange(
        out_shape[2],
        disable=not progress or config.pbar_hide,
        desc="Warping image",
    ):
        # Generate a grid of points in target voxel space for this z-slice
        chunk_coords = np.mgrid[
            0 : out_shape[0] : 1, 0 : out_shape[1] : 1, z : z + 1
        ]
        points = np.stack(
            [
                chunk_coords[0].flatten(),
                chunk_coords[1].flatten(),
                chunk_coords[2].flatten(),
            ],
            axis=-1,
        )

        # Convert to physical space
        points = points.astype(np.float32) * out_res

        # Transform points to source space using the inverse transform
        try:
            reg_inv._silenced_large_out_warning = True
            points_xf = reg_inv.xform(points)
        except Exception as e:
            reg_inv._silenced_large_out_warning = False
            raise e

        # Convert back to voxel space in source image
        points_xf_vox = points_xf / image_res

        # Sample the source image at these transformed coordinates
        sampled_values = map_coordinates(
            image,
            [points_xf_vox[:, 0], points_xf_vox[:, 1], points_xf_vox[:, 2]],
            order=1,
            mode="constant",
            cval=0,
        )

        # Reshape back to 2D and assign to this slice of the output image
        out[:, :, z] = sampled_values.reshape(out_shape[0], out_shape[1])

    return out

Thin Plate Spline transforms of 3D spatial data.

Notes

At least in my hands, TPStransforms are significantly faster than MovingLeastSquaresTransforms. The results are similar but not identical, so make sure to use the one that works best for your use case.

PARAMETER DESCRIPTION
landmarks_source
            Source landmarks as x/y/z coordinates.

TYPE: (M, 3) numpy array

landmarks_target
            Target landmarks as x/y/z coordinates.

TYPE: (M, 3) numpy array

batch_size
            Batch size for transforming points. The
            thin-plate spline generating a (N, M) distance
            matrix, where N is the number of points and M
            is the number of source landmarks. Because
            this can get prohibitively expensive, we're
            batching the transformation by default.
            Please note that the the overhead from batching
            seems negligible.

TYPE: int DEFAULT: 100000

Examples:

>>> from navis import transforms
>>> import numpy as np
>>> # Generate some mock landmarks
>>> src = np.array([[0, 0, 0], [10, 10, 10], [100, 100, 100], [80, 10, 30]])
>>> trg = np.array([[1, 15, 5], [9, 18, 21], [80, 99, 120], [5, 10, 80]])
>>> tr = transforms.thinplate.TPStransform(src, trg)
>>> points = np.array([[0, 0, 0], [50, 50, 50]])
>>> tr.xform(points)
array([[ 1.        , 15.        ,  5.        ],
       [40.55555556, 54.        , 65.        ]])
Source code in navis/transforms/thinplate.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class TPStransform(BaseTransform):
    """Thin Plate Spline transforms of 3D spatial data.

    Notes
    -----
    At least in my hands, `TPStransforms` are significantly faster than
    `MovingLeastSquaresTransforms`. The results are similar but not identical,
    so make sure to use the one that works best for your use case.

    Parameters
    ----------
    landmarks_source :  (M, 3) numpy array
                        Source landmarks as x/y/z coordinates.
    landmarks_target :  (M, 3) numpy array
                        Target landmarks as x/y/z coordinates.
    batch_size :        int, optional
                        Batch size for transforming points. The
                        thin-plate spline generating a (N, M) distance
                        matrix, where N is the number of points and M
                        is the number of source landmarks. Because
                        this can get prohibitively expensive, we're
                        batching the transformation by default.
                        Please note that the the overhead from batching
                        seems negligible.

    Examples
    --------
    >>> from navis import transforms
    >>> import numpy as np
    >>> # Generate some mock landmarks
    >>> src = np.array([[0, 0, 0], [10, 10, 10], [100, 100, 100], [80, 10, 30]])
    >>> trg = np.array([[1, 15, 5], [9, 18, 21], [80, 99, 120], [5, 10, 80]])
    >>> tr = transforms.thinplate.TPStransform(src, trg)
    >>> points = np.array([[0, 0, 0], [50, 50, 50]])
    >>> tr.xform(points)
    array([[ 1.        , 15.        ,  5.        ],
           [40.55555556, 54.        , 65.        ]])

    """

    def __init__(
        self,
        landmarks_source: np.ndarray,
        landmarks_target: np.ndarray,
        batch_size: int = 100_000,
    ):
        """Initialize class."""
        self.batch_size = batch_size
        self.source = np.asarray(landmarks_source)
        self.target = np.asarray(landmarks_target)

        # Some checks
        if self.source.shape[1] != 3:
            raise ValueError(f"Expected (N, 3) array, got {self.source.shape}")
        if self.target.shape[1] != 3:
            raise ValueError(f"Expected (N, 3) array, got {self.target.shape}")

        if self.source.shape[0] != self.target.shape[0]:
            raise ValueError(
                "Number of source landmarks must match number of target landmarks."
            )

        self._W, self._A = None, None

    def __eq__(self, other) -> bool:
        """Implement equality comparison."""
        if isinstance(other, TPStransform):
            if self.source.shape[0] == other.source.shape[0]:
                if np.all(self.source == other.source):
                    if np.all(self.target == other.target):
                        return True
        return False

    def __neg__(self) -> "TPStransform":
        """Invert direction."""
        # Switch source and target
        return TPStransform(self.target, self.source)

    def _calc_tps_coefs(self):
        # Calculate thinplate coefficients
        self._W, self._A = mops.tps_coefs(self.source, self.target)

    @property
    def W(self):
        if isinstance(self._W, type(None)):
            # Calculate coefficients
            self._calc_tps_coefs()
        return self._W

    @property
    def A(self):
        if isinstance(self._A, type(None)):
            # Calculate coefficients
            self._calc_tps_coefs()
        return self._A

    @property
    def matrix_rigid(self):
        """Return the rigid transformation matrix."""
        # The first row in self.A is the translation vector
        # The next 3x3 block is the rotation matrix
        # Let's combine these into a typical 4x4 transformation matrix
        # where the last row is [0, 0, 0, 1]
        m = np.zeros((4, 4))
        m[0:3, 0:3] = self.A[1:4, :].T
        m[0:3, 3] = self.A[0, :]
        m[3] = [0, 0, 0, 1]
        return m

    def copy(self):
        """Make copy."""
        x = TPStransform(self.source, self.target)

        x.__dict__.update(self.__dict__)

        return x

    def xform(self, points: np.ndarray) -> np.ndarray:
        """Transform points.

        Parameters
        ----------
        points :    (N, 3) array
                    Points to transform.

        Returns
        -------
        pointsxf :  (N, 3) array
                    Transformed points.

        """
        if isinstance(points, pd.DataFrame):
            if any(c not in points for c in ["x", "y", "z"]):
                raise ValueError("DataFrame must have x/y/z columns.")
            points = points[["x", "y", "z"]].values

        batch_size = self.batch_size if self.batch_size else points.shape[0]
        points_xf = []
        for i in range(0, points.shape[0], batch_size):
            # Get the current batch of points
            batch = points[i : i + batch_size]

            # N.B. U is of shape (N, M) where N is the number of points and M is the
            # number of source landmarks. This can get fairly expensive
            # (which is precisely why we batch the transformation)!
            U = mops.K_matrix(batch, self.source)
            P = mops.P_matrix(batch)
            # The warped pts are the affine part + the non-uniform part
            points_xf.append(np.matmul(P, self.A) + np.matmul(U, self.W))

        # Concatenate all batches
        return np.concatenate(points_xf, axis=0)

Return the rigid transformation matrix.

Initialize class.

Source code in navis/transforms/thinplate.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def __init__(
    self,
    landmarks_source: np.ndarray,
    landmarks_target: np.ndarray,
    batch_size: int = 100_000,
):
    """Initialize class."""
    self.batch_size = batch_size
    self.source = np.asarray(landmarks_source)
    self.target = np.asarray(landmarks_target)

    # Some checks
    if self.source.shape[1] != 3:
        raise ValueError(f"Expected (N, 3) array, got {self.source.shape}")
    if self.target.shape[1] != 3:
        raise ValueError(f"Expected (N, 3) array, got {self.target.shape}")

    if self.source.shape[0] != self.target.shape[0]:
        raise ValueError(
            "Number of source landmarks must match number of target landmarks."
        )

    self._W, self._A = None, None

Make copy.

Source code in navis/transforms/thinplate.py
148
149
150
151
152
153
154
def copy(self):
    """Make copy."""
    x = TPStransform(self.source, self.target)

    x.__dict__.update(self.__dict__)

    return x

Transform points.

PARAMETER DESCRIPTION
points
    Points to transform.

TYPE: (N, 3) array

RETURNS DESCRIPTION
pointsxf

Transformed points.

TYPE: (N, 3) array

Source code in navis/transforms/thinplate.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def xform(self, points: np.ndarray) -> np.ndarray:
    """Transform points.

    Parameters
    ----------
    points :    (N, 3) array
                Points to transform.

    Returns
    -------
    pointsxf :  (N, 3) array
                Transformed points.

    """
    if isinstance(points, pd.DataFrame):
        if any(c not in points for c in ["x", "y", "z"]):
            raise ValueError("DataFrame must have x/y/z columns.")
        points = points[["x", "y", "z"]].values

    batch_size = self.batch_size if self.batch_size else points.shape[0]
    points_xf = []
    for i in range(0, points.shape[0], batch_size):
        # Get the current batch of points
        batch = points[i : i + batch_size]

        # N.B. U is of shape (N, M) where N is the number of points and M is the
        # number of source landmarks. This can get fairly expensive
        # (which is precisely why we batch the transformation)!
        U = mops.K_matrix(batch, self.source)
        P = mops.P_matrix(batch)
        # The warped pts are the affine part + the non-uniform part
        points_xf.append(np.matmul(P, self.A) + np.matmul(U, self.W))

    # Concatenate all batches
    return np.concatenate(points_xf, axis=0)