First of all we need a couple of test images:
Input images I0 and I1:
# import numpy from StringIO import StringIO I0 = numpy.loadtxt(StringIO(""" 0 0 0 0 0 0 0 0.5000 0 0 0 0 1.0000 0 0 0 0 0.5000 0 0 0 0 0 0 0""") ) I1 = numpy.loadtxt(StringIO(""" 0 0 0 0 0 0 0.5 0 0 0 0 1.0 0 0 0 0 0.5 0 0 0 0 0 0 0 0 """) )Define initial horiozontal and vertical components of optical flow
u = numpy.zeros_like(I0); v = numpy.zeros_like(I0);Lets write class for making warps. As OF usually deals only with small displacements, we need iterative estimation: estimate, shift image by found vectors, find again.
class Warper: def __init__(self, shape, u0, v0, I0, I1, display = False): """ shape - shape of input function, u0, v0 - starting values of flow I0, I1 - images to compute flow between """ # saving dimensions self.M, self.N = shape[0], shape[1] # save initial estimation self.u, self.v = u0.copy(), v0.copy() # grid of coordinates, required further self.idx, self.idy = np.meshgrid(np.arange(self.N), np.arange(self.M)) # filter for partial derivatives computation self.mask = np.array([1, -8, 0, 8, -1], ndmin=2)/12.0; # flag of debug information output self.display = display self.counter = 0 # copy of images self.I0, self.I1 = I0.copy(), I1.copy() # here we create an instance of training function. On each step we will approach to minimum by gradient descent self.train = TrainFunctionSimple(u0, v0, rate=0.1) # main function def warp(self): if self.display: print 'Warp %d' % (self.counter,) # initial value u0, v0 = self.u.copy(), self.v.copy() # Ends of motion vectors. From these points we will "compensate" motion idxx = self.idx + u0 idyy = self.idy + v0 # get linearly interpolated values from (idxx, idyy) pixels of I1 I1warped = interp2linear(self.I1, idxx, idyy) # just debug output if self.display: print "I1warped", I1warped print "I0", I0 print "u0", u0 print "v0", v0 pass It = (I1warped - self.I0) print "It", It #My first wrong version (it gives no converging, something like continuing initial vectors to infinity during warps) (***) #Ix = ndimage.correlate(self.I1, self.mask, mode='nearest') #Iy = ndimage.correlate(self.I1, self.mask.T, mode='nearest') #Much better is: Ix = ndimage.correlate(I1warped, self.mask, mode='nearest') Iy = ndimage.correlate(I1warped, self.mask.T, mode='nearest') # boundary handling m = (idxx > self.N - 1) | (idxx < 0) | (idyy > self.M - 1) | (idyy < 0) Ix[m] = 0.0 Iy[m] = 0.0 It[m] = 0.0 self.Ix = Ix self.Iy = Iy self.It = It self.train.init(np.zeros_like(self.I0), np.zeros_like(self.I0)) for i_sgd in range(120): print self.train.step(Ix, Iy, It), self.u += self.train.tu.get_value() self.v += self.train.tv.get_value() self.counter += 1Now describing TrainFunction. Well design is not good yet
# class TrainFunction(object): def __init__(self, u0, v0, rate): self.rate = rate self.tu = theano.shared(u0,name='tu') self.tv = theano.shared(v0,name='tv') self.tIx = T.matrix("tIx") self.tIy = T.matrix("tIy") self.tIt = T.matrix("tIt") self.gu, self.gv = None, None self.E = None self.train_function = self.get_function() # this function must be overloaded in the derived classes def get_energy(self): raise Exception("Non implemented") # construct Theano-function for gradient descent def get_function(self): if self.E is None: self.E = self.get_energy() if self.gu is None or self.gv is None: self.gu, self.gv = T.grad(self.E, [self.tu, self.tv]) train_function = theano.function( inputs=[self.tIx, self.tIy, self.tIt], outputs=[self.E], updates=((self.tu, self.tu - self.rate * self.gu), (self.tv, self.tv - self.rate * self.gv)), allow_input_downcast=True) return train_function # initialization of flow values def init(self, u0, v0): self.tu.set_value(u0) self.tv.set_value(v0) # launching step of gradiennt descent def step(self, *args): return self.train_function(*args)
# class TrainFunctionSimple(TrainFunction): def __init__(self, *args, **kwargs): self.alpha = kwargs.get('alpha', 1.1) super(self.__class__, self).__init__(*args, **kwargs) # constructs Theano-function, that calculate Energy def get_energy(self,): # data term Edata = T.sum( ( self.tIx * self.tu + self.tIy * self.tv + self.tIt ) ** 2 ) # regularization term Ereg = T.sum( (self.tu)**2 + (self.tv)**2 ) return Edata+self.alpha*Eregfinally launching
wrpr = Warper( I0.shape, numpy.zeros_like(I0), numpy.zeros_like(I0), I0, I1, display=True) warps = 5 for i in range(warps): wrpr.warp()Printing the results:
numpy.set_printoptions(precision=3,) print wrpr.u print wrpr.v
[[ 0. 0. 0. 0. 0. ] [ 0. 0. -0.523 0. 0. ] [ 0. 0. -0.941 0. 0. ] [ 0. 0. -0.523 0. 0. ] [ 0. 0. 0. 0. 0. ]] [[ 0. 0. 0. 0. 0. ] [ 0. -0.692 0. 0. 0. ] [ 0. 0. 0. 0. 0. ] [ 0. 0.692 0. 0. 0. ] [ 0. 0. 0. 0. 0. ]]Well, not precise enough. However the direction is right.
UPDATE: due to my error in (***), see code, results and images below may have no sense
Some errors
In this implementation we estimate how to move I1 to match I0. So vectors points from locations of I0 to points of I1. So spatial derivatives must be calculated from I1, not I0:Ix = ndimage.correlate(self.I1, self.mask, mode='nearest') Iy = ndimage.correlate(self.I1, self.mask.T, mode='nearest')If you write instead:
Ix = ndimage.correlate(self.I0, self.mask, mode='nearest') Iy = ndimage.correlate(self.I0, self.mask.T, mode='nearest')you will get pressy strange results.
Input images I0 and I1:
So the "object" moves from left to right. I0 is a current frame and I1 is a previous frame. The vectors after 1 and 10 warps of correct iteration (derivatives of I1, I0 at background):
The vectors after 1 and 10 warps of incorrect iteration (derivatives of I0, I0 at background):
Regularization term
Ok. Now lets improve our regularization term. Instead of L2 norm, lets use Total Variation approach.
On the same settings lets use another train function class:
class TrainFunctionTV(TrainFunction): def __init__(self, *args, **kwargs): self.alpha = kwargs.get('alpha', 1.1) super(self.__class__, self).__init__(*args, **kwargs) def get_energy(self,): Edata = T.sum((self.tIx * self.tu + self.tIy * self.tv + self.tIt) ** 2) Ereg1 = T.sum( (self.tu[1:]-self.tu[:-1])**2 + (self.tv[1:]-self.tv[:-1])**2 ) Ereg2 = T.sum( (self.tu[:,1:]-self.tu[:,:-1]) **2 + (self.tv[:,1:]-self.tv[:,:-1]) ** 2) return Edata+self.alpha*(Ereg1+Ereg2)
Now we have these results:
print wrpr.u print wrpr.v [[-0.862 -0.903 -0.949 -0.941 -0.929] [-0.842 -0.92 -1.025 -0.968 -0.941] [-0.81 -0.933 -1.101 -0.99 -0.95 ] [-0.842 -0.92 -1.025 -0.968 -0.941] [-0.862 -0.903 -0.949 -0.941 -0.929]] [[-0.121 -0.136 -0.093 -0.06 -0.045] [-0.106 -0.197 -0.083 -0.043 -0.029] [-0. -0. -0. -0. -0. ] [ 0.106 0.197 0.083 0.043 0.029] [ 0.121 0.136 0.093 0.06 0.045]]Visualization:
before (L2) | after (TV) |
No comments:
Post a Comment