[python-dtcwt] 237/497: add implementation of non-rigid registration

Ghislain Vaillant ghisvail-guest at moszumanska.debian.org
Tue Jul 21 18:06:09 UTC 2015


This is an automated email from the git hooks/post-receive script.

ghisvail-guest pushed a commit to branch debian/sid
in repository python-dtcwt.

commit 9b712c801d597529480ce6d367ba55769736a722
Author: Rich Wareham <rjw57 at cam.ac.uk>
Date:   Mon Jan 27 15:18:17 2014 +0000

    add implementation of non-rigid registration
    
    This is a first implementation of the local-affine non-rigid
    registration algorithm using the DTCWT.
---
 dtcwt/registration.py     | 114 +++++++++++++++++++++++++++++++++++++++++++---
 tests/testregistration.py |  34 ++++++++++++++
 tests/traffic.npz         | Bin 0 -> 920912 bytes
 3 files changed, 141 insertions(+), 7 deletions(-)

diff --git a/dtcwt/registration.py b/dtcwt/registration.py
index b823c88..e2ca100 100644
--- a/dtcwt/registration.py
+++ b/dtcwt/registration.py
@@ -10,22 +10,29 @@ These functions are 2D-only for the moment.
 
 from __future__ import division
 
+import itertools
+
 import dtcwt
 import dtcwt.sampling
+import dtcwt.utils
 import numpy as np
 
 __all__ = [
     'EXPECTED_SHIFTS',
 
-    'affinewarphighpass',
+    'affinevelocityfield',
     'affinewarp',
+    'affinewarphighpass',
     'confidence',
+    'estimateflow',
+    'normsample',
+    'normsamplehighpass',
     'phasegradient',
     'qtildematrices',
-    'normsamplehighpass',
-    'normsample',
     'solvetransform',
     'velocityfield',
+    'warp',
+    'warphighpass',
 ]
 
 # Horizontal and vertical expected phase shifts for each subband
@@ -137,7 +144,7 @@ def qtildematrices(Yh1, Yh2, levels):
             else:
                 Qt_mat_sum += Qt
 
-            Qt_mats.append(Qt_mat_sum)
+        Qt_mats.append(Qt_mat_sum)
 
     return Qt_mats
 
@@ -181,7 +188,7 @@ def normsample(Yh, xs, ys, method=None):
     """
     return dtcwt.sampling.sample(Yh, xs*Yh.shape[1], ys*Yh.shape[0], method=method)
 
-def velocityfield(a, xs, ys):
+def affinevelocityfield(a, xs, ys):
     r"""
     Evaluate the velocity field parametrised by *a* at *xs* and *ys*.
 
@@ -220,7 +227,7 @@ def affinewarphighpass(Yh, a, method=None):
 
     """
     xs, ys = np.meshgrid(np.arange(0,1,1/Yh.shape[1]), np.arange(0,1,1/Yh.shape[0]))
-    vxs, vys = velocityfield(a, xs, ys)
+    vxs, vys = affinevelocityfield(a, xs, ys)
     return normsamplehighpass(Yh, xs+vxs, ys+vys, method=method)
 
 def affinewarp(Yh, a, method=None):
@@ -245,5 +252,98 @@ def affinewarp(Yh, a, method=None):
 
     """
     xs, ys = np.meshgrid(np.arange(0,1,1/Yh.shape[1]), np.arange(0,1,1/Yh.shape[0]))
-    vxs, vys = velocityfield(a, xs, ys)
+    vxs, vys = affinevelocityfield(a, xs, ys)
     return normsample(Yh, xs+vxs, ys+vys, method=method)
+
+def estimateflow(reference_h, target_h):
+    # Make a copy of reference_h for warping
+    warped_h = list(reference_h)
+
+    # Initialise matrix of 'a' vectors
+    avecs = np.zeros((warped_h[2].shape[0], warped_h[2].shape[1], 6))
+
+    # Compute initial global transform
+    for it in xrange(2):
+        levels = list(x for x in xrange(len(warped_h)-1, len(warped_h)-3, -1) if x>=0)
+        Qt_mats = list(x.sum() for x in qtildematrices(warped_h, target_h, levels))
+        Qt = np.sum(Qt_mats, axis=0)
+
+        a = solvetransform(Qt)
+        for r, c in itertools.product(xrange(avecs.shape[0]), xrange(avecs.shape[1])):
+            avecs[r,c] += a
+
+        for l in levels:
+            warped_h[l] = warphighpass(reference_h[l], avecs, method='bilinear')
+
+    # Refine estimate
+    for it in xrange(2 * (len(warped_h)-3)):
+        s, e = len(warped_h) - 2, len(warped_h) - 2 - (it+1)//2
+        levels = list(x for x in xrange(s, e-1, -1) if x>=1 and x<len(warped_h))
+        if len(levels) == 0:
+            continue
+
+        qts = np.sum(list(dtcwt.sampling.rescale(_boxfilter(x, 3), avecs.shape[:2], method='bilinear')
+                          for x in qtildematrices(warped_h, target_h, levels)),
+                     axis=0)
+
+        for r, c in itertools.product(xrange(avecs.shape[0]), xrange(avecs.shape[1])):
+            # if np.abs(np.linalg.det(qts[r,c][:-1,:-1])) > np.abs(qts[r,c][-1,-1]):
+            try:
+                avecs[r,c] += solvetransform(qts[r,c])
+            except np.linalg.LinError:
+                # It's OK for refinement to generate singular matrices
+                pass
+
+        for l in levels:
+            warped_h[l] = warphighpass(reference_h[l], avecs, method='bilinear')
+
+    return avecs
+
+def velocityfield(avecs, shape, method=None):
+    h, w = shape[:2]
+    pxs, pys = np.meshgrid(np.arange(0, w, dtype=np.float32) / w,
+                           np.arange(0, h, dtype=np.float32) / h)
+
+    avecs = dtcwt.sampling.rescale(avecs, shape, method='bilinear')
+    vxs = avecs[:,:,0] + avecs[:,:,2] * pxs + avecs[:,:,4] * pys
+    vys = avecs[:,:,1] + avecs[:,:,3] * pxs + avecs[:,:,5] * pys
+
+    return vxs, vys
+
+def warphighpass(Yh, avecs, method=None):
+    X, Y = np.meshgrid(np.arange(Yh.shape[1], dtype=np.float32)/Yh.shape[1],
+                       np.arange(Yh.shape[0], dtype=np.float32)/Yh.shape[0])
+    vxs, vys = velocityfield(avecs, Yh.shape, method=method)
+    return normsamplehighpass(Yh, X+vxs, Y+vys, method=method)
+
+def warp(I, avecs, method=None):
+    X, Y = np.meshgrid(np.arange(I.shape[1], dtype=np.float32)/I.shape[1],
+                       np.arange(I.shape[0], dtype=np.float32)/I.shape[0])
+    vxs, vys = velocityfield(avecs, I.shape, method=method)
+    return normsample(I, X+vxs, Y+vys, method=method)
+
+def _boxfilter(X, kernel_size):
+    """
+    INTERNAL
+
+    A simple box filter implementation.
+
+    """
+    if kernel_size % 2 == 0:
+        raise ValueError('Kernel size must be odd')
+
+    for axis_idx in xrange(2):
+        slices = [slice(None),] * len(X.shape)
+        out = X
+
+        for delta in xrange(1, 1+(kernel_size-1)//2):
+            slices[axis_idx] = dtcwt.utils.reflect(
+                    np.arange(X.shape[axis_idx]) + delta, -0.5, X.shape[axis_idx]-0.5)
+            out = out + X[slices]
+            slices[axis_idx] = dtcwt.utils.reflect(
+                    np.arange(X.shape[axis_idx]) - delta, -0.5, X.shape[axis_idx]-0.5)
+            out = out + X[slices]
+
+        X = out / kernel_size
+
+    return X
diff --git a/tests/testregistration.py b/tests/testregistration.py
new file mode 100644
index 0000000..bc22d96
--- /dev/null
+++ b/tests/testregistration.py
@@ -0,0 +1,34 @@
+import os
+
+import numpy as np
+
+import dtcwt
+from dtcwt.registration import *
+
+def setup():
+    global f1, f2
+    frames = np.load(os.path.join(os.path.dirname(__file__), 'traffic.npz'))
+    f1 = frames['f1']
+    f2 = frames['f2']
+
+def test_frames_loaded():
+    assert f1.shape == (576, 768)
+    assert f1.min() >= 0
+    assert f1.max() <= 1
+    assert f1.dtype == np.float64
+
+    assert f2.shape == (576, 768)
+    assert f2.min() >= 0
+    assert f2.max() <= 1
+    assert f2.dtype == np.float64
+
+def test_estimate_flor():
+    nlevels = 6
+    Yl1, Yh1 = dtcwt.dtwavexfm2(f1, nlevels=nlevels)
+    Yl2, Yh2 = dtcwt.dtwavexfm2(f2, nlevels=nlevels)
+    avecs = estimateflow(Yh1, Yh2)
+
+    # Make sure warped frame 1 has lower mean overlap error than non-warped
+    warped_f1 = warp(f1, avecs, method='bilinear')
+    assert np.mean(np.abs(warped_f1 - f2)) < np.mean(np.abs(f1-f2))
+
diff --git a/tests/traffic.npz b/tests/traffic.npz
new file mode 100644
index 0000000..f32b354
Binary files /dev/null and b/tests/traffic.npz differ

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/python-dtcwt.git



More information about the debian-science-commits mailing list