Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions sdcflows/tests/test_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2021 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
# https://www.nipreps.org/community/licensing/
#
"""Unit tests of the transform object."""
from subprocess import check_call
from itertools import product
import pytest
import nibabel as nb
import numpy as np
from skimage.morphology import ball
import scipy.ndimage as nd

from sdcflows import transform as tf


def generate_oracle(
coeff_file,
rotation=(None, None, None),
zooms=(2.0, 2.2, 1.5),
flip=(False, False, False),
):
"""Generate an in-silico phantom, and a corresponding (aligned) B-Spline field."""
data = ball(20)
data[19:22, ...] = 0
data = np.pad(data + nd.binary_erosion(data, ball(3)), 8)

zooms = [z if not f else -z for z, f in zip(zooms, flip)]
affine = np.diag(zooms + [1])
affine[:3, 3] = -affine[:3, :3] @ ((np.array(data.shape) - 1) * 0.5)

coeff_nii = nb.load(coeff_file)
coeff_aff = np.diag([5.0 if not f else -5.0 for f in flip] + [1])

if any(rotation):
R = nb.affines.from_matvec(
nb.eulerangles.euler2mat(
x=rotation[0],
y=rotation[1],
z=rotation[2],
)
)
affine = R @ affine
coeff_aff = R @ coeff_aff

phantom_nii = nb.Nifti1Image(
data.astype(np.uint8),
affine,
None,
)
coeff_nii = nb.Nifti1Image(
coeff_nii.dataobj,
coeff_aff,
coeff_nii.header,
)
return phantom_nii, coeff_nii


@pytest.mark.parametrize("pe_dir", ["j", "j-", "i", "i-", "k", "k-"])
@pytest.mark.parametrize("rotation", [(None, None, None), (0.2, None, None)])
@pytest.mark.parametrize("flip", list(product(*[(False, True)] * 3)))
def test_displacements_field(tmpdir, testdata_dir, outdir, pe_dir, rotation, flip):
"""Check the generated displacements fields."""
tmpdir.chdir()

# Generate test oracle
phantom_nii, coeff_nii = generate_oracle(
testdata_dir / "topup-coeff-fixed.nii.gz",
rotation=rotation,
)

b0 = tf.B0FieldTransform(coeffs=coeff_nii)
assert b0.fit(phantom_nii) is True
assert b0.fit(phantom_nii) is False

b0.apply(
phantom_nii,
pe_dir=pe_dir,
ro_time=0.2,
output_dtype="float32",
).to_filename("warped-sdcflows.nii.gz")
b0.to_displacements(
ro_time=0.2,
pe_dir=pe_dir,
).to_filename("itk-displacements.nii.gz")

phantom_nii.to_filename("phantom.nii.gz")
# Run antsApplyTransform
exit_code = check_call(
[
"antsApplyTransforms -d 3 -r phantom.nii.gz -i phantom.nii.gz "
"-o warped-ants.nii.gz -n BSpline -t itk-displacements.nii.gz"
],
shell=True,
)
assert exit_code == 0

ours = np.asanyarray(nb.load("warped-sdcflows.nii.gz").dataobj)
theirs = np.asanyarray(nb.load("warped-ants.nii.gz").dataobj)
assert np.all((np.sqrt(((ours - theirs) ** 2).sum()) / ours.size) < 1e-1)

if outdir:
from niworkflows.interfaces.reportlets.registration import (
SimpleBeforeAfterRPT as SimpleBeforeAfter,
)

orientation = "".join([ax[bool(f)] for ax, f in zip(("RL", "AP", "SI"), flip)])

SimpleBeforeAfter(
after_label="Theirs (ANTs)",
before_label="Ours (SDCFlows)",
after="warped-ants.nii.gz",
before="warped-sdcflows.nii.gz",
out_report=str(
outdir / f"xfm_pe-{pe_dir}_flip-{orientation}_x-{rotation[0] or 0}"
f"_y-{rotation[1] or 0}_z-{rotation[2] or 0}.svg"
),
).run()
60 changes: 29 additions & 31 deletions sdcflows/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def apply(
moved.header.set_data_dtype(output_dtype)
return moved

def to_displacements(self, ro_time, pe_dir):
def to_displacements(self, ro_time, pe_dir, itk_format=True):
"""
Generate a NIfTI file containing a displacements field transform compatible with ITK/ANTs.

Expand All @@ -215,38 +215,36 @@ def to_displacements(self, ro_time, pe_dir):
A NIfTI 1.0 object containing the distortion.

"""
from math import pi
from nibabel.affines import voxel_sizes, obliquity
from nibabel.orientations import io_orientation

# Generate warp field
data = self.shifts.get_fdata(dtype="float32").copy()
# Set polarity & scale VSM (voxel-shift-map) by readout time
vsm = self.shifts.get_fdata().copy()
pe_axis = "ijk".index(pe_dir[0])
pe_sign = -1.0 if pe_dir.endswith("-") else 1.0
pe_size = self.shifts.header.get_zooms()[pe_axis]
data *= pe_sign * ro_time * pe_size

fieldshape = tuple(list(data.shape[:3]) + [3])

# Compose a vector field
field = np.zeros((data.size, 3), dtype="float32")
field[..., pe_axis] = data.reshape(-1)

# If coordinate system is oblique, project displacements through directions matrix
aff = self.shifts.affine
if obliquity(aff).max() * 180 / pi > 0.01:
dirmat = np.eye(4)
dirmat[:3, :3] = aff[:3, :3] / (
voxel_sizes(aff) * io_orientation(aff)[:, 1]
)
field = nb.affines.apply_affine(dirmat, field)

warpnii = nb.Nifti1Image(
field.reshape(fieldshape)[:, :, :, np.newaxis, :], aff, None
vsm *= -1.0 if pe_dir.endswith("-") else 1.0
vsm *= ro_time

# Shape of displacements field
# Note that ITK NIfTI fields are 5D (have an empty 4th dimension)
fieldshape = tuple(list(vsm.shape[:3]) + [1, 3])

# Convert VSM to voxel displacements
ijk_deltas = np.zeros((vsm.size, 3), dtype="float32")
ijk_deltas[:, pe_axis] = vsm.reshape(-1)

# To convert from VSM to RAS field we just apply the affine
aff = self.shifts.affine.copy()
aff[:3, 3] = 0 # Translations MUST NOT be applied, though.
xyz_deltas = nb.affines.apply_affine(aff, ijk_deltas)
if itk_format:
# ITK displacement vectors are in LPS orientation
xyz_deltas[..., (0, 1)] *= -1.0

xyz_nii = nb.Nifti1Image(
xyz_deltas.reshape(fieldshape),
self.shifts.affine,
None,
)
warpnii.header.set_intent("vector", (), "")
warpnii.header.set_xyzt_units("mm")
return warpnii
xyz_nii.header.set_intent("vector", (), "")
xyz_nii.header.set_xyzt_units("mm")
return xyz_nii


def _cubic_bspline(d):
Expand Down