[python-dtcwt] 237/497: add implementation of non-rigid registration
Ghislain Vaillant
ghisvail-guest at moszumanska.debian.org
Tue Jul 21 18:06:09 UTC 2015
This is an automated email from the git hooks/post-receive script.
ghisvail-guest pushed a commit to branch debian/sid
in repository python-dtcwt.
commit 9b712c801d597529480ce6d367ba55769736a722
Author: Rich Wareham <rjw57 at cam.ac.uk>
Date: Mon Jan 27 15:18:17 2014 +0000
add implementation of non-rigid registration
This is a first implementation of the local-affine non-rigid
registration algorithm using the DTCWT.
---
dtcwt/registration.py | 114 +++++++++++++++++++++++++++++++++++++++++++---
tests/testregistration.py | 34 ++++++++++++++
tests/traffic.npz | Bin 0 -> 920912 bytes
3 files changed, 141 insertions(+), 7 deletions(-)
diff --git a/dtcwt/registration.py b/dtcwt/registration.py
index b823c88..e2ca100 100644
--- a/dtcwt/registration.py
+++ b/dtcwt/registration.py
@@ -10,22 +10,29 @@ These functions are 2D-only for the moment.
from __future__ import division
+import itertools
+
import dtcwt
import dtcwt.sampling
+import dtcwt.utils
import numpy as np
__all__ = [
'EXPECTED_SHIFTS',
- 'affinewarphighpass',
+ 'affinevelocityfield',
'affinewarp',
+ 'affinewarphighpass',
'confidence',
+ 'estimateflow',
+ 'normsample',
+ 'normsamplehighpass',
'phasegradient',
'qtildematrices',
- 'normsamplehighpass',
- 'normsample',
'solvetransform',
'velocityfield',
+ 'warp',
+ 'warphighpass',
]
# Horizontal and vertical expected phase shifts for each subband
@@ -137,7 +144,7 @@ def qtildematrices(Yh1, Yh2, levels):
else:
Qt_mat_sum += Qt
- Qt_mats.append(Qt_mat_sum)
+ Qt_mats.append(Qt_mat_sum)
return Qt_mats
@@ -181,7 +188,7 @@ def normsample(Yh, xs, ys, method=None):
"""
return dtcwt.sampling.sample(Yh, xs*Yh.shape[1], ys*Yh.shape[0], method=method)
-def velocityfield(a, xs, ys):
+def affinevelocityfield(a, xs, ys):
r"""
Evaluate the velocity field parametrised by *a* at *xs* and *ys*.
@@ -220,7 +227,7 @@ def affinewarphighpass(Yh, a, method=None):
"""
xs, ys = np.meshgrid(np.arange(0,1,1/Yh.shape[1]), np.arange(0,1,1/Yh.shape[0]))
- vxs, vys = velocityfield(a, xs, ys)
+ vxs, vys = affinevelocityfield(a, xs, ys)
return normsamplehighpass(Yh, xs+vxs, ys+vys, method=method)
def affinewarp(Yh, a, method=None):
@@ -245,5 +252,98 @@ def affinewarp(Yh, a, method=None):
"""
xs, ys = np.meshgrid(np.arange(0,1,1/Yh.shape[1]), np.arange(0,1,1/Yh.shape[0]))
- vxs, vys = velocityfield(a, xs, ys)
+ vxs, vys = affinevelocityfield(a, xs, ys)
return normsample(Yh, xs+vxs, ys+vys, method=method)
+
+def estimateflow(reference_h, target_h):
+ # Make a copy of reference_h for warping
+ warped_h = list(reference_h)
+
+ # Initialise matrix of 'a' vectors
+ avecs = np.zeros((warped_h[2].shape[0], warped_h[2].shape[1], 6))
+
+ # Compute initial global transform
+ for it in xrange(2):
+ levels = list(x for x in xrange(len(warped_h)-1, len(warped_h)-3, -1) if x>=0)
+ Qt_mats = list(x.sum() for x in qtildematrices(warped_h, target_h, levels))
+ Qt = np.sum(Qt_mats, axis=0)
+
+ a = solvetransform(Qt)
+ for r, c in itertools.product(xrange(avecs.shape[0]), xrange(avecs.shape[1])):
+ avecs[r,c] += a
+
+ for l in levels:
+ warped_h[l] = warphighpass(reference_h[l], avecs, method='bilinear')
+
+ # Refine estimate
+ for it in xrange(2 * (len(warped_h)-3)):
+ s, e = len(warped_h) - 2, len(warped_h) - 2 - (it+1)//2
+ levels = list(x for x in xrange(s, e-1, -1) if x>=1 and x<len(warped_h))
+ if len(levels) == 0:
+ continue
+
+ qts = np.sum(list(dtcwt.sampling.rescale(_boxfilter(x, 3), avecs.shape[:2], method='bilinear')
+ for x in qtildematrices(warped_h, target_h, levels)),
+ axis=0)
+
+ for r, c in itertools.product(xrange(avecs.shape[0]), xrange(avecs.shape[1])):
+ # if np.abs(np.linalg.det(qts[r,c][:-1,:-1])) > np.abs(qts[r,c][-1,-1]):
+ try:
+ avecs[r,c] += solvetransform(qts[r,c])
+ except np.linalg.LinError:
+ # It's OK for refinement to generate singular matrices
+ pass
+
+ for l in levels:
+ warped_h[l] = warphighpass(reference_h[l], avecs, method='bilinear')
+
+ return avecs
+
+def velocityfield(avecs, shape, method=None):
+ h, w = shape[:2]
+ pxs, pys = np.meshgrid(np.arange(0, w, dtype=np.float32) / w,
+ np.arange(0, h, dtype=np.float32) / h)
+
+ avecs = dtcwt.sampling.rescale(avecs, shape, method='bilinear')
+ vxs = avecs[:,:,0] + avecs[:,:,2] * pxs + avecs[:,:,4] * pys
+ vys = avecs[:,:,1] + avecs[:,:,3] * pxs + avecs[:,:,5] * pys
+
+ return vxs, vys
+
+def warphighpass(Yh, avecs, method=None):
+ X, Y = np.meshgrid(np.arange(Yh.shape[1], dtype=np.float32)/Yh.shape[1],
+ np.arange(Yh.shape[0], dtype=np.float32)/Yh.shape[0])
+ vxs, vys = velocityfield(avecs, Yh.shape, method=method)
+ return normsamplehighpass(Yh, X+vxs, Y+vys, method=method)
+
+def warp(I, avecs, method=None):
+ X, Y = np.meshgrid(np.arange(I.shape[1], dtype=np.float32)/I.shape[1],
+ np.arange(I.shape[0], dtype=np.float32)/I.shape[0])
+ vxs, vys = velocityfield(avecs, I.shape, method=method)
+ return normsample(I, X+vxs, Y+vys, method=method)
+
+def _boxfilter(X, kernel_size):
+ """
+ INTERNAL
+
+ A simple box filter implementation.
+
+ """
+ if kernel_size % 2 == 0:
+ raise ValueError('Kernel size must be odd')
+
+ for axis_idx in xrange(2):
+ slices = [slice(None),] * len(X.shape)
+ out = X
+
+ for delta in xrange(1, 1+(kernel_size-1)//2):
+ slices[axis_idx] = dtcwt.utils.reflect(
+ np.arange(X.shape[axis_idx]) + delta, -0.5, X.shape[axis_idx]-0.5)
+ out = out + X[slices]
+ slices[axis_idx] = dtcwt.utils.reflect(
+ np.arange(X.shape[axis_idx]) - delta, -0.5, X.shape[axis_idx]-0.5)
+ out = out + X[slices]
+
+ X = out / kernel_size
+
+ return X
diff --git a/tests/testregistration.py b/tests/testregistration.py
new file mode 100644
index 0000000..bc22d96
--- /dev/null
+++ b/tests/testregistration.py
@@ -0,0 +1,34 @@
+import os
+
+import numpy as np
+
+import dtcwt
+from dtcwt.registration import *
+
+def setup():
+ global f1, f2
+ frames = np.load(os.path.join(os.path.dirname(__file__), 'traffic.npz'))
+ f1 = frames['f1']
+ f2 = frames['f2']
+
+def test_frames_loaded():
+ assert f1.shape == (576, 768)
+ assert f1.min() >= 0
+ assert f1.max() <= 1
+ assert f1.dtype == np.float64
+
+ assert f2.shape == (576, 768)
+ assert f2.min() >= 0
+ assert f2.max() <= 1
+ assert f2.dtype == np.float64
+
+def test_estimate_flor():
+ nlevels = 6
+ Yl1, Yh1 = dtcwt.dtwavexfm2(f1, nlevels=nlevels)
+ Yl2, Yh2 = dtcwt.dtwavexfm2(f2, nlevels=nlevels)
+ avecs = estimateflow(Yh1, Yh2)
+
+ # Make sure warped frame 1 has lower mean overlap error than non-warped
+ warped_f1 = warp(f1, avecs, method='bilinear')
+ assert np.mean(np.abs(warped_f1 - f2)) < np.mean(np.abs(f1-f2))
+
diff --git a/tests/traffic.npz b/tests/traffic.npz
new file mode 100644
index 0000000..f32b354
Binary files /dev/null and b/tests/traffic.npz differ
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/python-dtcwt.git
More information about the debian-science-commits
mailing list