A basic particle filter tracking algorithm, using a uniformly distributed step as motion model, and the initial target colour as determinant feature for the weighting function. This requires an approximately uniformly coloured object, which moves at a speed no larger than stepsize per frame.
This implementation assumes that the video stream is a sequence of numpy arrays, an iterator pointing to such a sequence or a generator generating one. The particle filter itself is a generator to allow for operating on real-time video streams.
1 from numpy import * 2 from numpy.random import * 3 4 5 def resample(weights): 6 n = len(weights) 7 indices =  8 C = [0.] + [sum(weights[:i+1]) for i in range(n)] 9 u0, j = random(), 0 10 for u in [(u0+i)/n for i in range(n)]: 11 while u > C[j]: 12 j+=1 13 indices.append(j-1) 14 return indices 15 16 17 def particlefilter(sequence, pos, stepsize, n): 18 seq = iter(sequence) 19 x = ones((n, 2), int) * pos # Initial position 20 f0 = seq.next()[tuple(pos)] * ones(n) # Target colour model 21 yield pos, x, ones(n)/n # Return expected position, particles and weights 22 for im in seq: 23 x += uniform(-stepsize, stepsize, x.shape) # Particle motion model: uniform step 24 x = x.clip(zeros(2), array(im.shape)-1).astype(int) # Clip out-of-bounds particles 25 f = im[tuple(x.T)] # Measure particle colours 26 w = 1./(1. + (f0-f)**2) # Weight~ inverse quadratic colour distance 27 w /= sum(w) # Normalize w 28 yield sum(x.T*w, axis=1), x, w # Return expected position, particles and weights 29 if 1./sum(w**2) < n/2.: # If particle cloud degenerate: 30 x = x[resample(w),:] # Resample particles according to weights
The following code shows the tracker operating on a test sequence featuring a moving square against a uniform background.
1 if __name__ == "__main__": 2 from pylab import * 3 from itertools import izip 4 import time 5 ion() 6 seq = [ im for im in zeros((20,240,320), int)] # Create an image sequence of 20 frames long 7 x0 = array([120, 160]) # Add a square with starting position x0 moving along trajectory xs 8 xs = vstack((arange(20)*3, arange(20)*2)).T + x0 9 for t, x in enumerate(xs): 10 xslice = slice(x-8, x+8) 11 yslice = slice(x-8, x+8) 12 seq[t][xslice, yslice] = 255 13 14 for im, p in izip(seq, particlefilter(seq, x0, 8, 100)): # Track the square through the sequence 15 pos, xs, ws = p 16 position_overlay = zeros_like(im) 17 position_overlay[tuple(pos)] = 1 18 particle_overlay = zeros_like(im) 19 particle_overlay[tuple(xs.T)] = 1 20 hold(True) 21 draw() 22 time.sleep(0.3) 23 clf() # Causes flickering, but without the spy plots aren't overwritten 24 imshow(im,cmap=cm.gray) # Plot the image 25 spy(position_overlay, marker='.', color='b') # Plot the expected position 26 spy(particle_overlay, marker=',', color='r') # Plot the particles 27 show()