Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) Lucas-Kanade registration #100

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
51 changes: 50 additions & 1 deletion test/test_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import tempfile

from test_utils import PySparkTestCase
from numpy import allclose, dstack, mean, random, ndarray
from numpy import allclose, dstack, mean, random, ndarray, zeros, pi
from scipy.ndimage.interpolation import shift
from nose.tools import assert_true

from thunder.registration.registration import Registration
from thunder.registration.transformation import TranslationTransformation, EuclideanTransformation
from thunder.rdds.fileio.imagesloader import ImagesLoader


Expand Down Expand Up @@ -173,3 +174,51 @@ def test_crosscorrVolume(self):
imOut = Registration('planarcrosscorr').prepare(ref).run(imIn).first()[1]
assert_true(allclose(paramOut, [[2, -2], [2, -2], [2, -2]]))
assert_true(allclose(ref[:-2, 2:, :], imOut[:-2, 2:, :]))


class TestTransformation(ImgprocessingTestCase):
def test_translation(self):
im = zeros((5, 5))
im[2:4, 2:4] = 1.0
expected = zeros((5, 5))
expected[3:, :2] = 1.0
tfm = TranslationTransformation(shift=(1, -2))
out = tfm.apply(im)
assert_true(allclose(out, expected))

def _run_euclidean_test(self, sz):
im = zeros((sz, sz))
im[:1, :] = 1.0
expected = zeros((sz, sz))
expected[:, 2] = 1.0
tfm = EuclideanTransformation(shift=(2.0, 0.0), rotation=pi/2)
out = tfm.apply(im)
assert_true(allclose(out, expected))

def test_euclidean(self):
self._run_euclidean_test(5)
self._run_euclidean_test(6)


class TestLucasKanade(ImgprocessingTestCase):
def test_euclidean(self):
vol = zeros((16, 16))
vol[4:12, 4:8] = 1.0
rot = pi / 8.0
trueTfm = EuclideanTransformation((0, 1), rot)
volTfm = trueTfm.apply(vol)

reg = Registration('lucaskanade', transformationType='Euclidean').prepare(volTfm)
predTfm = reg.getTransform(vol)
print trueTfm.getParams(), predTfm.getParams()
assert_true(allclose(trueTfm.getParams(), predTfm.getParams(), atol=1e-2))

def test_translation(self):
vol = zeros((16, 16))
vol[4:12, 4:8] = 1.0
trueTfm = TranslationTransformation((1.75, -2.5))
volTfm = trueTfm.apply(vol)
reg = Registration('lucaskanade', transformationType='Translation').prepare(volTfm)
predTfm = reg.getTransform(vol)
print trueTfm.getParams(), predTfm.getParams()
assert_true(allclose(trueTfm.getParams(), predTfm.getParams(), atol=1e-2))
3 changes: 2 additions & 1 deletion thunder/registration/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from thunder.registration.methods.crosscorr import CrossCorr, PlanarCrossCorr
from thunder.registration.methods.crosscorr import CrossCorr, PlanarCrossCorr
from thunder.registration.methods.lucaskanade import LucasKanade
94 changes: 94 additions & 0 deletions thunder/registration/methods/lucaskanade.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
""" Registration methods based on Lucas-Kanade registration"""

from numpy import array, ndarray, inf, zeros

from thunder.rdds.images import Images
from thunder.registration.registration import RegistrationMethod
from thunder.registration.transformation import TRANSFORMATION_TYPES
from thunder.registration.methods.utils import volumesToMatrix, solveLinearized, computeReferenceMean, checkReference


class LucasKanade(RegistrationMethod):
"""Lucas-Kanade registration method.

Lucas-Kanade (LK) is an iterative algorithm for aligning an image to a reference. It aims to minimize the squared
error between the transformed image and the reference. As the relationship between transformation parameters and
transformed pixels is nonlinear, we have to perform a series of linearizations similar to Levenberg-Marquardt.
At each iteration, we compute the Jacobian of the output image with respect to the input parameters, and solve a
least squares problem to identify a change in parameters. We update the parameters and repeat.

To increase robustness, we extend the traditional LK algorithm to use a set of reference images.
We minimize the squared error of the difference of the transformed image and a learned weighting of the references.
"""

def __init__(self, transformationType='Translation', border=0, tol=1e-5, maxIter=100, robust=False):
"""
Parameters
----------
transformationType : one of 'Translation', 'Euclidean', optional, default = 'Translation'
type of transformation to use
border : int or tuple, optional, default = 0
Border to be zeroed out after transformations. For most datasets, it is
critical that this value be larger than the maximum translation to get
good results and avoid boundary artifacts.
maxIter : int, optional, default = 100
maximum number of iterations
tol : float, optional, default = 1e-5
stopping criterion on the L2 norm of the change in parameters
robust : bool, optional, default = False
solve a least absolute deviation problem instead of least squares
"""
self.transformationType = transformationType
self.border = border
self.maxIter = maxIter
self.tol = tol
self.robust = robust

def prepare(self, images, startidx=None, stopidx=None):
"""
Prepare Lucas-Kanade registration by computing or specifying a reference image.

Parameters
----------
images : ndarray or Images object
Images to compute reference from, or a single image to set as reference

See computeReferenceMean.
"""
if isinstance(images, Images):
self.reference = computeReferenceMean(images, startidx, stopidx)
elif isinstance(images, ndarray):
self.reference = images
else:
raise Exception('Must provide either an Images object or a reference')
# Convert references to matrix to speed up solving linearized system
self.referenceMat = volumesToMatrix(self.reference)
return self

def isPrepared(self, images):
"""
Check if Lucas-Kanade is prepared by checking the dimensions of the reference.

See checkReference.
"""

if not hasattr(self, 'reference'):
raise Exception('Reference not defined')
else:
checkReference(self.reference, images)

def getTransform(self, vol):
from numpy.linalg import norm
# Create initial transformation
tfm = TRANSFORMATION_TYPES[self.transformationType](shift=zeros(vol.ndim))
iter = 0
normDelta = inf
params = []
while iter < self.maxIter and normDelta > self.tol:
volTfm, jacobian = tfm.jacobian(vol, border=self.border)
deltaTransformParams, coeff = solveLinearized(volumesToMatrix(volTfm), volumesToMatrix(jacobian), self.referenceMat, self.robust)
tfm.updateParams(deltaTransformParams)
normDelta = norm(deltaTransformParams)
params.append(tfm.getParams())
iter += 1
return tfm
139 changes: 138 additions & 1 deletion thunder/registration/methods/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,141 @@ def computeDisplacement(arry1, arry2):
# cast to basic python int for serialization
adjusted = [int(d - n) if d > n // 2 else int(d) for (d, n) in pairs]

return adjusted
return adjusted

def zeroBorder(vol, border=1, cval=0.0):
"""Zero-out boundary voxels of a volume.

Parameters
----------
vol: 3d volume
border: scalar or tuple containing borders for each dimension
cval: constant value to replace boundary voxels with, default is 0.0
"""
import numpy as np # sorry
vol = vol.copy() # create new copy so we don't overwrite vol
dims = np.array(vol.shape)
if np.size(border) == 1:
border = border * np.ones(vol.ndim, dtype=int)
border = np.array(border, dtype=int)
# Don't apply border to singleton dimensions.
border[dims == 1] = 0
assert len(border) == vol.ndim
if np.any(dims - border <= 0):
raise ValueError('Border %s exceeds volume shape %s.' %
(str(border), str(dims)) )
for dim, bval in enumerate(border):
if bval > 0:
slices = [slice(bval) if d == dim else slice(None) for d in xrange(vol.ndim)]
vol[slices] = cval
slices[dim] = slice(-bval, None)
vol[slices] = cval
return vol


def solveLinearized(vec, jacobian, reference, robust=False):
"""Solve linearized registration problem for change in transformation parameters and weighting on reference basis.

Parameters
----------
vec : array, shape (nvoxels,)
vectorized volume
jacobian: array, shape (nvoxels, nparams)
jacobian for each transformation parameter
reference: array, shape (nvoxels, nbasis)
array of vectorized reference volumes
robust: bool, optional, default = False
solve a least absolute deviation problem instead of least squares

Returns
-------
deltaTransforms : array, shape (nparams,)
optimal change in transformation parameters
coeff : array, shape (nbasis,)
optimal weighting of reference volumes
"""
from numpy import column_stack
A = column_stack((jacobian, reference))
if robust:
from statsmodels.regression.quantile_regression import QuantReg
quantile = 0.5
model = QuantReg(vec, A).fit(q=quantile)
params = model.params
else:
from numpy.linalg import lstsq
model = lstsq(A, vec)
params = model[0]
from numpy import split
deltaTransform, coeff = split(params, [jacobian.shape[1]])
return deltaTransform, coeff


def volToVec(vol):
"""
Convert volume to vector.

Parameters
----------
vol : array
volume

Returns
-------
vectorized volume
"""
return vol.ravel()


def vecToVol(vec, dims):
"""
Convert vector to volume.

Parameters
----------
vec : array, shape (nvoxels,)
vectorized volume
dims : tuple, optional
shape of volume, must be set to reshape vectors into volumes

Returns
-------
volume if input is a vector, and vector if input is a volume
"""
assert vec.size == prod(dims)
return vec.reshape(dims)


def volumesToMatrix(vols):
"""Convert list of volumes to a matrix.

Parameters
----------
vols : list of arrays

Returns
-------
array with size nvoxels by number of volumes
"""
from numpy import column_stack
if not isinstance(vols, list):
return volToVec(vols)
else:
return column_stack(map(volToVec, vols)).squeeze()


def imageGradients(im, sigma=None):
"""Compute gradients of volume in each dimension using a Sobel filter.

Parameters
----------
im : ndarray
single volume
sigma : float or tuple, optional, default = None
smoothing amount to apply to volume before computing gradients
"""
from scipy.ndimage.filters import gaussian_filter, sobel
if sigma is not None:
im = gaussian_filter(im, sigma)
grads = [sobel(im, axis=dim, mode='constant') / 8.0 for dim in xrange(im.ndim)]
return grads

6 changes: 4 additions & 2 deletions thunder/registration/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ class Registration(object):
def __new__(cls, method, **kwargs):

from thunder.registration.methods.crosscorr import CrossCorr, PlanarCrossCorr
from thunder.registration.methods.lucaskanade import LucasKanade

REGMETHODS = {
'crosscorr': CrossCorr,
'planarcrosscorr': PlanarCrossCorr
'planarcrosscorr': PlanarCrossCorr,
'lucaskanade': LucasKanade,
}

checkParams(method, REGMETHODS.keys())

return REGMETHODS[method](kwargs)
return REGMETHODS[method](**kwargs)

@staticmethod
def load(file):
Expand Down
Loading