Kalman-Based Target Trajectory Tracking Performance QC Analysis

Photo by Kelly on Pexels.

  • Based upon the previous study, we continue evaluating performance of the Kalman Filter (KF) by walking through some numerical tests.
  • In the presence of uncertainty, one of the most difficult issues for tracking in control systems is to estimate the accuracy and precision of hidden variables.
  • KF is considered as the widely adapted estimation algorithm for object tracking applications
  • KF is an optimal estimation algorithm used to estimate states of a dynamic system from uncertain measurements. It takes measurements over time and creates a prediction of the next measurements.
  • In this post, we will see a practical approach on how to use KF to track and predict the trajectory of an object. 
  • Our objective is to track the trajectory of a target by using the basics of KF and compare the filter output and the original trajectory.

Table of Contents

  1. The Kalman Filter Intuition
  2. Formulation of a Problem
  3. Linear Position-Time Path
  4. Parabolic Position-Time Path
  5. Extended Kalman Filter (EKF)
  6. Tracking the Bike’s Path
  7. Unscented Kalman Filter (UKF)
  8. Industry Application in Dynamic Positioning System
  9. Smoothed Position and Speed Estimates
  10. Radar EKF Trajectory
  11. Conclusions
  12. Explore More
  13. References

The Kalman Filter Intuition

  • KF is used for object tracking when you have estimates of object position, measured with error. 
  • If you are tracking only in 2-D and want to estimate position in 3-D, the third dimension is another state variable you do not measure directly.
  • Main idea of KF: we want to estimate the next object position incorporating only the three parameters: object motion model; measurement noise; process noise.
  • The KF cycle: First the filter predicts the next state from the provided state transition, then the noisy measurement information is incorporated in the correction phase. The cycle is repeated.
  • The actual KF algorithm is implemented as follows:
    • Measurement #1 initializes system state estimate & error covariance
    • Measurement #2 re-initializes system state estimate & error covariance
    • Measurement #3 predicts system state estimate & error covariance to measurement time
    • Compute the Kalman gain
    • Estimate the system state & error covariance at the measurement time, go to step 3.
  • Read more about KF here.

Formulation of a Problem

  • Despite the large number of investigations into Kalman filter trackers, the optimal selection of a process noise model has not been discussed.
  • Although various criteria have been proposed and investigated for the design of Kalman filters and its variants to achieve better tracking accuracy, robustness, and real-time capability, relationship between these performance indices and the model parameters such as the process noise variance is not discussed even in recent studies. 
  • Another significant problem in a measurement model of the conventional Kalman tracking filter is that most studies consider only position measurements and therefore cannot make full use of modern sensors that are able to measure velocity, such as ultrawideband Doppler radar. Moreover, sensor fusion based on IoT also enables the simultaneous measurement of position and velocity.

Linear Position-Time Path

  • We begin with the simplest example of a linear position-time path. We need to estimate both position and velocity vs time using the following KF algorithm
import numpy as np
from numpy.linalg import inv
import matplotlib.pyplot as plt
def getMeasurement(updateNumber):
    if updateNumber == 1:
      getMeasurement.currentPosition = 0
      getMeasurement.currentVelocity = 60 # m/s

    dt = 0.1

    w = 8 * np.random.randn(1)
    v = 8 * np.random.randn(1)

    z = getMeasurement.currentPosition + getMeasurement.currentVelocity*dt + v
    getMeasurement.currentPosition = z - v
    getMeasurement.currentVelocity = 60 + w
    return [z, getMeasurement.currentPosition, getMeasurement.currentVelocity]
def filter(z, updateNumber):
    dt = 0.1
    # Initialize State
    if updateNumber == 1:
        filter.x = np.array([[0],
                            [20]])
        filter.P = np.array([[5, 0],
                                 [0, 5]])

        filter.A = np.array([[1, dt],
                             [0, 1]])
        filter.H = np.array([[1, 0]])
        filter.HT = np.array([[1],
                              [0]])
        filter.R = 10
        filter.Q = np.array([[1, 0],
                             [0, 3]])

    # Predict State Forward
    x_p = filter.A.dot(filter.x)
    # Predict Covariance Forward
    P_p = filter.A.dot(filter.P).dot(filter.A.T) + filter.Q
    # Compute Kalman Gain
    S = filter.H.dot(P_p).dot(filter.HT) + filter.R
    K = P_p.dot(filter.HT)*(1/S)

    # Estimate State
    residual = z - filter.H.dot(x_p)
    filter.x = x_p + K*residual

    # Estimate Covariance
    filter.P = P_p - K.dot(filter.H).dot(P_p)

    return [filter.x[0], filter.x[1], filter.P];
dt = 0.1
t = np.linspace(0, 10, num=300)
numOfMeasurements = len(t)

measTime = []
measPos = []
measDifPos = []
estDifPos = []
estPos = []
estVel = []
posBound3Sigma = []

for k in range(1,numOfMeasurements):
        z = getMeasurement(k)
        # Call Filter and return new State
        f = filter(z[0], k)
        # Save off that state so that it could be plotted
        measTime.append(k)
        measPos.append(z[0])
        measDifPos.append(z[0]-z[1])
        estDifPos.append(f[0]-z[1])
        estPos.append(f[0])
        estVel.append(f[1])
        posVar = f[2]
        posBound3Sigma.append(3*np.sqrt(posVar[0][0]))
  • Plotting measured and estimated positions
plt.plot(estPos,label='Estimated')
plt.plot(measPos,label='Measured')
plt.ylabel('Position (m)')
plt.xlabel('Time (sec)')
plt.legend()
plt.grid()
Measured and estimated positions
  • Plotting measured and estimated position differences
plt.plot(measDifPos,label='Measured')
plt.plot(estDifPos,label='Estimated',lw=4)
plt.ylabel('DifPos (m)')
plt.xlabel('Time (sec)')
plt.legend()
plt.grid()
Measured and estimated position differences
  • Estimated velocity
plt.plot(estVel)
plt.title('Estimated Velocity (km/h)')
plt.xlabel('Time (sec)')
plt.legend()
plt.grid()
Estimated velocity
  • The plots generated by this example clearly show that the KF is working.

Parabolic Position-Time Path

  • Let’s look at the parabolic position-time path using the KF algorithm
import numpy as np
import matplotlib.pyplot as plt
n=100
sz = (n) # size of array
t = np.linspace(0,10,n)
x = t + 2*t**2 # truth function
z = x + np.random.normal(0,7.5,size=sz) # noise

Q = 1e-3 # process variance

# allocate space for arrays
xhat=np.zeros(sz)      # a posteri estimate of x
P=np.zeros(sz)         # a posteri error estimate
xhatminus=np.zeros(sz) # a priori estimate of x
Pminus=np.zeros(sz)    # a priori error estimate
K=np.zeros(sz)         # Kalman gain

R = 0.03**2 # estimate of measurement variance


xhat[0] = 0.0
P[0] = 1.0

for i in range(1,n):

  xhatminus[i] = xhat[i-1]
  Pminus[i] = P[i-1]+Q


  K[i] = Pminus[i]/( Pminus[i]+R )
  xhat[i] = xhatminus[i]+K[i]*(z[i]-xhatminus[i])
  P[i] = (1-K[i])*Pminus[i]

plt.figure()
plt.plot(t, z,'k+',label='noisy measurements')
plt.plot(t, xhat,'b-',label='a posteri estimate')
plt.plot(t,x,color='g',label='truth value')
plt.legend()
plt.xlabel('Time')
plt.ylabel('Position')
plt.grid()

plt.show()
parabolic position-time path: noisy measurements vs a posteriori estimate
  • It appears that KF still provides a reasonable approximation of the true path.

Extended Kalman Filter (EKF)

  • Let’s consider the example of tracking a robot in circular motion using the Extended Kalman Filter (EKF) class
import numpy as np
import matplotlib.pyplot as plt
import math

class Filter:
    def __init__(self, var=None):
        self.filter = var
        self.filter.predict()

    def get_point(self):
        return self.filter.x[0:2].flatten()

    def predict_point(self, measurement):

        self.filter.predict()
        self.filter.update(measurement)
        return self.get_point()


class ExtendedKalmanFilter:
    def __init__(self,sigma,u,dt):
        self.x = np.array([0.,  # x
                           0.,  # y
                           0.,  # orientation
                           0.]).reshape(-1, 1)  # linear velocity

        self.u = u
        self.dt = dt
        self.p = np.diag([999. for i in range(4)])
        self.q = np.diag([0.1 for i in range(4)])
        self.r = np.diag([sigma, sigma])
        self.jf = np.diag([1. for i in range(4)])
        self.jg = np.array([[1., 0., 0., 0.],
                            [0., 1., 0., 0.]])
        self.h = np.array([[1., 0., 0., 0.],
                           [0., 1., 0., 0.]])
        self.z = np.array([0.,
                           0.]).reshape(-1, 1)

    def predict(self):
        x, y, phi, v = self.x
        vt, wt = self.u
        dT = self.dt

        new_x = x + vt * np.cos(phi) * dT
        new_y = y + vt * np.sin(phi) * dT
        new_phi = phi + wt * dT
        new_v = vt

        self.x[0]=new_x
        self.x[1]=new_y
        self.x[2]=new_phi
        self.x[3]=new_v

        self.jf[0][2] = -v * np.sin(phi) * dT
        self.jf[0][3] = v * np.cos(phi) * dT
        self.jf[1][2] = v * np.cos(phi) * dT
        self.jf[1][3] = v * np.sin(phi) * dT

        # p-prediction:
        self.p = self.jf.dot(self.p).dot(self.jf.transpose()) + self.q

    def update(self, measurement):
        z = np.array(measurement).reshape(-1, 1)
        y = z - self.h.dot(self.x)

        s = self.jg.dot(self.p).dot(self.jg.transpose()) + self.r
        k = self.p.dot(self.jg.transpose()).dot(np.linalg.inv(s))

        self.x = self.x + k.dot(y)
        self.p = (np.eye(4) - k.dot(self.h)).dot(self.p)

def action(parfilter, measurements):
    result = []
    for i in range(len(measurements)):
        m = measurements[i]
        pred = parfilter.predict_point(m)
        result.append(pred)

    return result

def create_artificial_circle_data(radius,dt,u,sigma):
    measurements = []
    vt, wt = u
    phi = np.arange(0, 2*math.pi, wt * dt)
    x = radius * np.cos(phi) + sigma * np.random.randn(len(phi))
    y = radius * np.sin(phi) + sigma * np.random.randn(len(phi))

    measurements += [[x, y] for x, y in zip(x, y)]
    return measurements

def plot(measurements, result, deadreckon_results):
    plt.figure("Extended Kalman Filter Visualization")
    plt.scatter([x[0] for x in measurements], [x[1] for x in measurements], c='red', label='GPS Position', alpha=0.3,s=4)
    plt.scatter([x[0] for x in result], [x[1] for x in result], c='blue', label='Kalman filter estimate Position', alpha=0.3, s=4,marker='x')
    plt.scatter([x[0] for x in deadreckon_results], [x[1] for x in deadreckon_results], c='black', label='dead reckoning Position', alpha=0.3, s=4,marker='x')
    plt.legend()
    plt.grid('on')
    plt.title("Circular Motion Robot and Kalman Filter Prediction")
    plt.xlabel("X-axis")
    plt.ylabel("Y-axis")
    plt.xlim(-10, 10)
    plt.ylim(-10, 10)
    plt.tight_layout()
    plt.show()

def deadReckoning(robo, u, dt):
    initial_state = robo.x
    vt, wt = u
    x=vt/wt
    y=0
    phi=math.pi/2
    x_list=[]
    y_list=[]
    phi_list=[]
    result = []
    T = 2 * math.pi / wt
    for i in range(int(T/dt)):
        new_x = x + vt * np.cos(phi) * dt
        new_y = y + vt * np.sin(phi) * dt
        new_phi = phi + wt * dt
        x = new_x
        y = new_y
        phi = new_phi
        # new_v = vt

        x_list.append(new_x)
        y_list.append(new_y)
        phi_list.append(new_phi)
    result += [[p, q] for p, q in zip(x_list, y_list)]

    return result

  • Running the EKF code with sigma=0.5 and dt=0.001
if __name__ == '__main__':
    input_u = [10, 2]
    radiusOfCircle = input_u[0] / input_u[1]
    robot = ExtendedKalmanFilter(sigma=0.5, u=input_u, dt=0.001)
    robotController = Filter(robot)
    deadreckon_results=deadReckoning(robo=robot, u=input_u, dt=0.001)
    measurementGPS = create_artificial_circle_data(radius=radiusOfCircle,u=input_u, dt=0.001, sigma=0.5)
    results = action(robotController, measurementGPS)
    plot(measurementGPS, results, deadreckon_results)
Circular Motion Robot and Extended Kalman Filter Prediction with sigma=0.5 and dt=0.001
  • Running the above code with sigma=0.9 and dt=0.001
Circular Motion Robot and Extended Kalman Filter Prediction with sigma=0.9 and dt=0.001
  • Running the above code with sigma=0.9 and dt=0.01
Circular Motion Robot and Extended Kalman Filter Prediction with sigma=0.9 and dt=0.01
  • Here we measure a circular (polar) movement in a Cartesian plane. The rotation gives an acceleration in X and Y and it is missing in the F matrix. We need to update the F matrix to include the complete physical model including acceleration.
  • As any other KF algorithm, the EKF uses the state-to-measurement transformation to convert the system state estimate from the state space to the measurement space.
  • In general, this test demonstrates the exceptional GPS noise robustness of EKF. It also highlights the importance of Q (model process covariance).

Tracking the Bike’s Path

  • What about utilizing the noisy GPS signals to estimate the bike’s path? Does the implemented algorithm yield accurate knowledge of the bike’s position at any given time, including the associated velocity and acceleration information?
  • Calculating the exact trajectory
import numpy as np

from numpy import pi, cos, sin, sqrt, diag
from numpy.linalg import inv
from numpy.random import randn


t = np.linspace(0, 2*pi, 100)
dt = t[1] - t[0]

print (dt)
0.06346651825433926

# position
x = 2*cos(t)
y = sin(2*t)

# velocity
dxdt = -2*sin(t)
dydt = 2*cos(2*t)

# accel
d2xdt2 = -2*cos(t)
d2ydt2 = -4*sin(2*t)

# jerk
d3xdt3 = 2*sin(t)
d3ydt3 = -8*cos(2*t)
  • Plotting the exact trajectory
plt.plot(x,y)
plt.xlabel('X')
plt.ylabel('Y')
plt.title('TRAJECTORY')
Exact bike's trajectory
  • Plotting the exact velocity
plt.plot(dxdt,dydt)
plt.xlabel('DXDt')
plt.ylabel('DYDt')
plt.title('VELOCITY')
Exact bike's velocity
  • Plotting the exact acceleration
plt.plot(d2xdt2,d2ydt2)
plt.xlabel('DXDt2')
plt.ylabel('DYDt2')
plt.title('ACCELERATION')
Exact bike's acceleration
  • Plotting the exact jerk
plt.plot(d3xdt3,d3ydt3)
plt.title('RATE OF ACCELERATION (JERK)')
plt.xlabel('DXDt3')
plt.ylabel('DYDt3')
Exact bike's jerk
  • Calculating the angular speed and generating synthetic GPS measurements with noise
# angular speed (scalar)
omega = (dxdt*d2ydt2 - dydt*d2xdt2) / (dxdt**2 + dydt**2)

# speed (scalar)
speed = sqrt(dxdt**2 + dydt**2)

# measurement error
gps_sig = 0.1
omega_sig = 0.3
speed_sig = 0.1

# noisy measurements
x_gps = x + gps_sig * randn(*x.shape)
y_gps = y + gps_sig * randn(*y.shape)
omega_sens = omega + omega_sig * randn(*omega.shape)
speed_sens = speed + speed_sig * randn(*speed.shape)
  • Plotting the exact trajectory and the synthetic GPS data
plt.plot(x,y)
plt.plot(x_gps,y_gps,'+')
plt.legend(['True', 'GPS'])
plt.title('TRAJECTORY')
plt.xlabel('X')
plt.ylabel('Y')
The exact trajectory and the synthetic GPS data
  • Plotting the exact velocity and the velocity sensor data
plt.plot(t,speed)
plt.plot(t,speed_sens,'+')
plt.legend(['True', 'Sensor'])
plt.title('SPEED V')
plt.xlabel('VX')
plt.ylabel('VY')
The exact velocity and the velocity sensor data
  • Plotting the exact angular speed and the corresponding sensor measurements
plt.plot(t,omega)
plt.plot(t,omega_sens,'+')
plt.legend(['True', 'GPS'])
plt.title('ANGULAR SPEED W')
plt.xlabel('WX')
plt.ylabel('WY')
The exact angular speed and the corresponding sensor measurements
  • Applying the KF algorithm to the above data
A = np.array([
    [1, dt, (dt**2)/2, 0, 0, 0],
    [0, 1, dt, 0, 0, 0],
    [0, 0, 1, 0, 0, 0],
    [0, 0, 0, 1, dt, (dt**2)/2],
    [0, 0, 0, 0, 1, dt],
    [0, 0, 0, 0, 0, 1],
    ])
Q1 = np.array([(dt**3)/6, (dt**2)/2, dt, 0, 0, 0])
Q1 = np.expand_dims(Q1, 1)
Q2 = np.array([0, 0, 0, (dt**3)/6, (dt**2)/2, dt])
Q2 = np.expand_dims(Q2, 1)

j_var = max(np.var(d3xdt3), np.var(d3ydt3))
Q = j_var * (Q1 @ Q1.T + Q2 @ Q2.T)
H = np.array([
    [1, 0, 0, 0, 0, 0],
    [0, 0, 0, 1, 0, 0],
    ])
R = diag(np.array([gps_sig**2, gps_sig**2]))
x_init = np.array([x[0], dxdt[0], d2xdt2[0], y[0], dydt[0], d2ydt2[0]])
P_init = 0.01 * np.eye(len(x_init))  # small initial prediction error
# create an observation vector of noisy GPS signals
observations = np.array([x_gps, y_gps]).T

# matrix dimensions
nx = Q.shape[0]
ny = R.shape[0]
nt = observations.shape[0]

# allocate identity matrix for re-use
Inx = np.eye(nx)

# allocate result matrices
x_pred = np.zeros((nt, nx))      # prediction of state vector
P_pred = np.zeros((nt, nx, nx))  # prediction error covariance matrix
x_est = np.zeros((nt, nx))       # estimation of state vector
P_est = np.zeros((nt, nx, nx))   # estimation error covariance matrix
K = np.zeros((nt, nx, ny))       # Kalman Gain

# set initial prediction
x_pred[0] = x_init
P_pred[0] = P_init

# for each time-step...
for i in range(nt):

    # prediction stage
    if i > 0:
        x_pred[i] = A @ x_est[i-1]
        P_pred[i] = A @ P_est[i-1] @ A.T + Q

    # estimation stage
    y_obs = observations[i]
    K[i] = P_pred[i] @ H.T @ inv((H @ P_pred[i] @ H.T) + R)
    x_est[i] = x_pred[i] + K[i] @ (y_obs - H @ x_pred[i])
    P_est[i] = (Inx - K[i] @ H) @ P_pred[i]
  • Comparing the exact and the KF estimated trajectories
plt.plot(x,y)
plt.plot(x_est[:, 0],x_est[:, 3])
plt.legend(['True', 'Estimate'])
plt.title('TRAJECTORY')
plt.xlabel('X')
plt.ylabel('Y')
The exact and the KF estimated trajectories
  • By plotting the X and Y position estimations, we can see that the KF performed reasonably well.
  • One problem with the above KF algorithm is that it only works for models with purely linear relationships. It was fine for the GPS-only example above, but as soon as we try to assimilate data from the other two sensors, the method falls apart. This is because the speed and angular speed measurements have non-linear relationships with the bike state vector.
  • Now let’s apply the Extended Kalman Filter (EKF) algorithm to assimilate the GPS, speedometer (velocity), and gyroscope (acceleration) signals with our predictive model
#Example 2: Use the Extended Kalman Filter to Assimilate All Sensors
def eval_h(x_pred):
    
    # expand prediction of state vector
    px, vx, ax, py, vy, ay = x_pred
    
    # compute angular vel
    w = (vx*ay - vy*ax) / (vx**2 + vy**2)
    
    # compute speed
    s = sqrt(vx**2 + vy**2)

    return np.array([px, py, w, s])

def eval_H(x_pred):
    
    # expand prediction of state vector
    px, vx, ax, py, vy, ay = x_pred
    V2 = vx**2 + vy**2
    
    # angular vel partial derivs
    dwdvx = (V2*ay - 2*vx*(vx*ay-vy*ax)) / (V2**2)
    dwdax = -vy / V2
    dwdvy = (-V2*ax - 2*vy*(vx*ay-vy*ax)) / (V2**2)
    dwday = vx / V2

    # speed partial derivs
    dsdvx = vx / sqrt(V2)
    dsdvy = vy / sqrt(V2)

    return np.array([
        [1, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0],
        [0, dwdvx, dwdax, 0, dwdvy, dwday],
        [0, dsdvx, 0, 0, dsdvy, 0],
        ])
# redefine R to include speedometer and gyro variances
R = diag(np.array([gps_sig**2, gps_sig**2, omega_sig**2, speed_sig**2]))
# create an observation vector of all noisy signals
observations = np.array([x_gps, y_gps, omega_sens, speed_sens]).T

# matrix dimensions
nx = Q.shape[0]
ny = R.shape[0]
nt = observations.shape[0]

# allocate identity matrix for re-use
Inx = np.eye(nx)

# allocate result matrices
x_pred = np.zeros((nt, nx))      # prediction of state vector
P_pred = np.zeros((nt, nx, nx))  # prediction error covariance matrix
x_est = np.zeros((nt, nx))       # estimation of state vector
P_est = np.zeros((nt, nx, nx))   # estimation error covariance matrix
K = np.zeros((nt, nx, ny))       # Kalman Gain

# set initial prediction
x_pred[0] = x_init
P_pred[0] = P_init

# for each time-step...
for i in range(nt):

    # prediction stage
    if i > 0:
        x_pred[i] = A @ x_est[i-1]
        P_pred[i] = A @ P_est[i-1] @ A.T + Q

    # estimation stage
    y_obs = observations[i]
    y_pred = eval_h(x_pred[i])
    H_pred = eval_H(x_pred[i])
    K[i] = P_pred[i] @ H_pred.T @ inv((H_pred @ P_pred[i] @ H_pred.T) + R)
    x_est[i] = x_pred[i] + K[i] @ (y_obs - y_pred)
    P_est[i] = (Inx - K[i] @ H_pred) @ P_pred[i]
  • Comparing the exact and the EKF estimated trajectories
plt.plot(x,y)
plt.plot(x_est[:, 0],x_est[:, 3])
plt.legend(['True', 'Estimate'])
plt.title('TRAJECTORY')
plt.xlabel('X')
plt.ylabel('Y')
The exact and the EKF estimated trajectories
  • By plotting the X and Y position estimations, we can see that the EKF performed even better than the KF.
  • Bottom line: it is very instructive working through the process of mocking up the bike scenario and then applying the KF and EKF to the artificial data.

Unscented Kalman Filter (UKF)

1. Prediction Step

  • Calculating Sigma Points
  • Calculating Associated Weights
  • Nonlinear Function Propagation
  • Calculating Weighted Mean and Covariance
  • Augmenting State and Covariance

2. Correction Step

  • Propagating Predicted Sigma Points
  • Cross-Correlation Calculation
  • Kalman Gain Calculation
  • State and Covariance Update
  • Running the corresponding code
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as stats
import math
import sys
def augment_vectors(x, v):
    return np.row_stack((x, v))

def augment_covariances(P, Q):
    prows, pcols = np.shape(P)[0], np.shape(P)[1]
    qrows, qcols = np.shape(Q)[0], np.shape(Q)[1]
    
    nrows = prows + qrows
    ncols = pcols + qcols
    
    Pa = np.zeros((nrows, ncols))
    Pa[0:prows, 0:pcols] = P
    Pa[prows:nrows, pcols:ncols] = Q
    
    return Pa
x = np.array([[1.], [2.]])
v = np.array([[3.], [4.]])

P = np.array([[1., 1.], [1., 1.]])
Q = np.array([[2., 2.], [2., 2.]])

xa = augment_vectors(x, v)
Pa = augment_covariances(P, Q)

print(xa)
print(Pa)
[[1.]
 [2.]
 [3.]
 [4.]]
[[1. 1. 0. 0.]
 [1. 1. 0. 0.]
 [0. 0. 2. 2.]
 [0. 0. 2. 2.]]
#Unscented Kalman Filter (UKF)
class UKF(object):
    def __init__(self, dim_x, dim_z, Q, R, kappa=0.0):
        
        '''
        UKF class constructor
        inputs:
            dim_x : state vector x dimension
            dim_z : measurement vector z dimension
        
        - step 1: setting dimensions
        - step 2: setting number of sigma points to be generated
        - step 3: setting scaling parameters
        - step 4: calculate scaling coefficient for selecting sigma points
        - step 5: calculate weights
        '''
                
        # setting dimensions
        self.dim_x = dim_x         # state dimension
        self.dim_z = dim_z         # measurement dimension
        self.dim_v = np.shape(Q)[0]
        self.dim_n = np.shape(R)[0]
        self.dim_a = self.dim_x + self.dim_v + self.dim_n # assuming noise dimension is same as x dimension
        
        # setting number of sigma points to be generated
        self.n_sigma = (2 * self.dim_a) + 1
        
        # setting scaling parameters
        self.kappa = 3 - self.dim_a #kappa
        self.alpha = 0.001
        self.beta = 2.0

        alpha_2 = self.alpha**2
        self.lambda_ = alpha_2 * (self.dim_a + self.kappa) - self.dim_a
        
        # setting scale coefficient for selecting sigma points
        # self.sigma_scale = np.sqrt(self.dim_a + self.lambda_)
        self.sigma_scale = np.sqrt(self.dim_a + self.kappa)
        
        # calculate unscented weights
        # self.W0m = self.W0c = self.lambda_ / (self.dim_a + self.lambda_)
        # self.W0c = self.W0c + (1.0 - alpha_2 + self.beta)
        # self.Wi = 0.5 / (self.dim_a + self.lambda_)
        
        self.W0 = self.kappa / (self.dim_a + self.kappa)
        self.Wi = 0.5 / (self.dim_a + self.kappa)
        
        # initializing augmented state x_a and augmented covariance P_a
        self.x_a = np.zeros((self.dim_a, ))
        self.P_a = np.zeros((self.dim_a, self.dim_a))
        
        self.idx1, self.idx2 = self.dim_x, self.dim_x + self.dim_v
        
        self.P_a[self.idx1:self.idx2, self.idx1:self.idx2] = Q
        self.P_a[self.idx2:, self.idx2:] = R
        
        print(f'P_a = \n{self.P_a}\n')
            
    def predict(self, f, x, P):       
        self.x_a[:self.dim_x] = x
        self.P_a[:self.dim_x, :self.dim_x] = P
        
        xa_sigmas = self.sigma_points(self.x_a, self.P_a)
        
        xx_sigmas = xa_sigmas[:self.dim_x, :]
        xv_sigmas = xa_sigmas[self.idx1:self.idx2, :]
        
        y_sigmas = np.zeros((self.dim_x, self.n_sigma))              
        for i in range(self.n_sigma):
            y_sigmas[:, i] = f(xx_sigmas[:, i], xv_sigmas[:, i])
        
        y, Pyy = self.calculate_mean_and_covariance(y_sigmas)
        
        self.x_a[:self.dim_x] = y
        self.P_a[:self.dim_x, :self.dim_x] = Pyy
               
        return y, Pyy, xx_sigmas
        
    def correct(self, h, x, P, z):
        self.x_a[:self.dim_x] = x
        self.P_a[:self.dim_x, :self.dim_x] = P
        
        xa_sigmas = self.sigma_points(self.x_a, self.P_a)
        
        xx_sigmas = xa_sigmas[:self.dim_x, :]
        xn_sigmas = xa_sigmas[self.idx2:, :]
        
        y_sigmas = np.zeros((self.dim_z, self.n_sigma))
        for i in range(self.n_sigma):
            y_sigmas[:, i] = h(xx_sigmas[:, i], xn_sigmas[:, i])
            
        y, Pyy = self.calculate_mean_and_covariance(y_sigmas)
                
        Pxy = self.calculate_cross_correlation(x, xx_sigmas, y, y_sigmas)

        K = Pxy @ np.linalg.pinv(Pyy)
        
        x = x + (K @ (z - y))
        P = P - (K @ Pyy @ K.T)
        
        return x, P, xx_sigmas
        
    
    def sigma_points(self, x, P):
        
        '''
        generating sigma points matrix x_sigma given mean 'x' and covariance 'P'
        '''
        
        nx = np.shape(x)[0]
        
        x_sigma = np.zeros((nx, self.n_sigma))       
        x_sigma[:, 0] = x
        
        S = np.linalg.cholesky(P)
        
        for i in range(nx):
            x_sigma[:, i + 1]      = x + (self.sigma_scale * S[:, i])
            x_sigma[:, i + nx + 1] = x - (self.sigma_scale * S[:, i])
            
        return x_sigma
    
    
    def calculate_mean_and_covariance(self, y_sigmas):
        ydim = np.shape(y_sigmas)[0]
        
        # mean calculation
        y = self.W0 * y_sigmas[:, 0]
        for i in range(1, self.n_sigma):
            y += self.Wi * y_sigmas[:, i]
            
        # covariance calculation
        d = (y_sigmas[:, 0] - y).reshape([-1, 1])
        Pyy = self.W0 * (d @ d.T)
        for i in range(1, self.n_sigma):
            d = (y_sigmas[:, i] - y).reshape([-1, 1])
            Pyy += self.Wi * (d @ d.T)
    
        return y, Pyy
    
    def calculate_cross_correlation(self, x, x_sigmas, y, y_sigmas):
        xdim = np.shape(x)[0]
        ydim = np.shape(y)[0]
        
        n_sigmas = np.shape(x_sigmas)[1]
    
        dx = (x_sigmas[:, 0] - x).reshape([-1, 1])
        dy = (y_sigmas[:, 0] - y).reshape([-1, 1])
        Pxy = self.W0 * (dx @ dy.T)
        for i in range(1, n_sigmas):
            dx = (x_sigmas[:, i] - x).reshape([-1, 1])
            dy = (y_sigmas[:, i] - y).reshape([-1, 1])
            Pxy += self.Wi * (dx @ dy.T)
    
        return Pxy
x0 = np.array([1.0, 2.0])
P0 = np.array([[1.0, 0.0], [0.0, 1.0]])
Q = np.array([[0.5, 0.0], [0.0, 0.5]])

z = np.array([1.2, 1.8])
R = np.array([[0.3, 0.0], [0.0, 0.3]])
def f(x, v):
    return (x + v)

def h(x, n):
    return (x + n)

nx = np.shape(x)[0]
nz = np.shape(z)[0]
nv = np.shape(x)[0]
nn = np.shape(z)[0]

ukf = UKF(dim_x=nx, dim_z=nz, Q=Q, R=R, kappa=(3 - nx))

x1, P1, _ = ukf.predict(f, x0, P0)

print(f'x = \n{x1.round(5)}\n')
print(f'P = \n{P1.round(5)}\n')

x2, P2, _ = ukf.correct(h, x1, P1, z)

print(f'x = \n{x2.round(5)}\n')
print(f'P = \n{P2.round(5)}\n')

P_a = 
[[0.  0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0.  0. ]
 [0.  0.  0.5 0.  0.  0. ]
 [0.  0.  0.  0.5 0.  0. ]
 [0.  0.  0.  0.  0.3 0. ]
 [0.  0.  0.  0.  0.  0.3]]

x = 
[1. 2.]

P = 
[[1.5 0. ]
 [0.  1.5]]

x = 
[1.16667 1.83333]

P = 
[[ 0.25 -0.  ]
 [-0.    0.25]]
def KF_predict(F, x, P, Q):
    x = (F @ x)
    P = F @ P @ F.T + Q
    return x, P

def KF_correct(H, z, R, x, P):
    Pxz = P @ H.T   
    S = H @ P @ H.T + R
    
    K = Pxz @ np.linalg.pinv(S)
    
    x = x + K @ (z - H @ x)
    I = np.eye(P.shape[0])
    P = (I - K @ H) @ P
    return x, P

F = np.array([[1.0, 0.0], [0.0, 1.0]])
H = np.array([[1.0, 0.0], [0.0, 1.0]])

x1, P1 = KF_predict(F, x0, P0, Q)

print(f'x = \n{x1.round(5)}\n')
print(f'P = \n{P1.round(5)}\n')

x2, P2 = KF_correct(H, x1, P1, z, R)

print(f'x = \n{x2.round(5)}\n')
print(f'P = \n{P2.round(5)}\n')
x = 
[1. 2.]

P = 
[[1.5 0. ]
 [0.  1.5]]

x = 
[1.16667 1.83333]

P = 
[[0.25 0.  ]
 [0.   0.25]]
def f_2(x, nu):
    xo = np.zeros((np.shape(x)[0],))
    xo[0] = x[0] + x[2] + nu[0]
    xo[1] = x[1] + x[3] + nu[1]
    xo[2] = x[2] + nu[2]
    xo[3] = x[3] + nu[3]
    return xo
class RangeMeasurement:
    def __init__(self, position):
        self.range = np.sqrt(position[0]**2 + position[1]**2)
        self.bearing = np.arctan(position[1] / (position[0] + sys.float_info.epsilon))
        self.position = np.array([position[0],position[1]])

    def actual_position(self):
        return self.position
        
    def asArray(self):
        return np.array([self.range,self.bearing])
        
    def show(self):
        print(f'range: {self.range}')
        print(f'bearing: {self.bearing}')
        
measurement = RangeMeasurement((10.7, 5.6)) # ground-truth
measurement.show()
range: 12.076837334335508
bearing: 0.4821640110688151
def h_2(x, n):
    '''
    nonlinear measurement model for range sensor
    x : input state vector [2 x 1] ([0]: p_x, [1]: p_y)
    z : output measurement vector [2 x 1] ([0]: range, [1]: bearing )
    '''
    z = np.zeros((2,))
    px = x[0] + n[0]
    py = x[1] + n[1]
    
    z[0] = np.sqrt(px**2 + py**2)
    z[1] = np.arctan(py / (px + sys.float_info.epsilon))
    return z
x0 = np.array([2.0, 1.0, 0., 0.])

P0 = np.array([[0.01, 0.0, 0.0, 0.0],
              [0.0, 0.01, 0.0, 0.0],
              [0.0, 0.0, 0.05, 0.0],
              [0.0, 0.0, 0.0, 0.05]])

Q = np.array([[0.05, 0.0, 0.0, 0.0],
              [0.0, 0.05, 0.0, 0.0],
              [0.0, 0.0, 0.1, 0.0],
              [0.0, 0.0, 0.0, 0.1]])

R = np.array([[0.01, 0.0],
              [0.0, 0.01]])

z = np.array([2.5, 0.05])

nx = np.shape(x0)[0]
nz = np.shape(R)[0]
nv = np.shape(x0)[0]
nn = np.shape(R)[0]

ukf = UKF(dim_x=nx, dim_z=nz, Q=Q, R=R, kappa=(3 - nx))

x, P, _ = ukf.predict(f_2, x0, P0)

print(f'x = \n {x.round(3)}')
print(f'P = \n {P.round(3)}')

x, P, _ = ukf.correct(h_2, x, P, z)

print(f'x = \n {x.round(3)}')
print(f'P = \n {P.round(3)}')
P_a = 
[[0.   0.   0.   ... 0.   0.   0.  ]
 [0.   0.   0.   ... 0.   0.   0.  ]
 [0.   0.   0.   ... 0.   0.   0.  ]
 ...
 [0.   0.   0.   ... 0.1  0.   0.  ]
 [0.   0.   0.   ... 0.   0.01 0.  ]
 [0.   0.   0.   ... 0.   0.   0.01]]

x = 
 [2. 1. 0. 0.]
P = 
 [[ 0.11  0.    0.05 -0.  ]
 [ 0.    0.11 -0.    0.05]
 [ 0.05 -0.    0.15  0.  ]
 [-0.    0.05  0.    0.15]]
x = 
 [ 2.554  0.356  0.252 -0.293]
P = 
 [[ 0.01  -0.001  0.005 -0.   ]
 [-0.001  0.01  -0.     0.005]
 [ 0.005 -0.     0.129 -0.   ]
 [-0.     0.005 -0.     0.129]]
# prepare helper functions for visualizing the ellipses
from matplotlib.patches import Ellipse

def create_covariance_ellipse(pos, cov):
    # https://www.visiondummy.com/2014/04/draw-error-ellipse-representing-covariance-matrix
    eig_values, eig_vectors = np.linalg.eig(cov)
    
    scale_95 = np.sqrt(5.991)
    radius_1 = scale_95 * eig_values[0]
    radius_2 = scale_95 * eig_values[1]
    angle = np.arctan2(eig_vectors[1, 1], eig_vectors[0, 1])
    
    return radius_1, radius_2, angle

def draw_ellipse(ax, mu, radius_1, radius_2, angle, color):
    # https://matplotlib.org/stable/gallery/shapes_and_collections/ellipse_demo.html
    ellipse = Ellipse(
        mu,
        width=radius_1 * 2,
        height=radius_2 * 2,
        angle=np.rad2deg(angle) + 90,
        facecolor=color,
        alpha=0.4)
    ax.add_artist(ellipse)
    return ax

def plot_ellipse(ax, x, P, color):
    x = x[0:2].reshape(2,)
    P = P[0:2, 0:2]
    r1, r2, angle = create_covariance_ellipse(x, P)
    draw_ellipse(ax, x, r1, r2, angle, color)
    
def get_correlated_dataset(n, cov, mu, scale):
    # https://carstenschelp.github.io/2018/09/14/Plot_Confidence_Ellipse_001.html
    latent = np.random.randn(n, 2)
    cov = latent.dot(cov)
    scaled = cov * scale
    scaled_with_offset = scaled + mu
    # return x and y of the new, correlated dataset
    return scaled_with_offset[:, 0], scaled_with_offset[:, 1]

def plot_samples(ax, samples_num, x, P, color, markersize, label):
    scale = 1, 1
    x, y = get_correlated_dataset(samples_num, P, x, scale)
    ax.scatter(x, y, s=markersize, marker='x', c=color, label=label)
    
def plot_mean(ax, x, size, color, label):
    ax.scatter(x[0], x[1], s=size, marker='o', c=color, label=label)
    
def plot_state(ax, x, P, samples_num, markersize, color, label):
    x = x[0:2].reshape(2,)
    P = P[0:2, 0:2]
    
    plot_ellipse(ax, x, P, color)
    plot_samples(ax, samples_num, x, P, color, markersize, label+'_possibilities')
    plot_mean(ax, x, 200, color, label+'_mean')
    
def create_viewer(title, xlabel, ylabel, xlim=None, ylim=None):
    fig, viewer = plt.subplots(figsize=(24, 12))
    
    viewer.set_title(title, fontsize=22, color='green', fontweight='bold')
    
    viewer.axvline(c='grey', lw=2)
    viewer.axhline(c='grey', lw=2)

    viewer.set_xlabel(xlabel, fontsize=22, fontweight ='bold')
    viewer.set_ylabel(ylabel, fontsize=22, fontweight ='bold')
    
    if (xlim != None):
        viewer.set_xlim(xlim[0], xlim[1])
    
    if (ylim != None):
        viewer.set_ylim(ylim[0], ylim[1])
    
    return viewer

def visualize_estimate(viewer, label, color, x, P):
    plot_state(viewer, x=x, P=P, samples_num=500, markersize=16, color=color, label=label)
    
def update_plotter():
    plt.grid(visible=True)
    plt.legend(loc='upper left')
    plt.show()
x0 = np.array([10., 5., 0., 0.])

P0 = np.array([[0.3, 0.1, 0.0, 0.0],
              [0.1, 0.3, 0.0, 0.0],
              [0.0, 0.0, 0.1, 0.0],
              [0.0, 0.0, 0.0, 0.1]])

Q = np.array([[0.05, 0.0, 0.0, 0.0],
              [0.0, 0.05, 0.0, 0.0],
              [0.0, 0.0, 0.1, 0.0],
              [0.0, 0.0, 0.0, 0.1]])

z = measurement.asArray()
R = np.array([[0.1, 0.0],[0.0, 0.1]])

nx = np.shape(x0)[0]
nz = np.shape(z)[0]
nv = np.shape(x0)[0]
nn = np.shape(z)[0]

ukf = UKF(dim_x=nx, dim_z=nz, Q=Q, R=R, kappa=(3 - nx))


print('=====Iteration 1====')
x, P, x_sigmas = ukf.correct(h_2, x0, P0, z)
print(f'x = \n{x.round(3)}\n')
print(f'P = \n{P.round(3)}\n')
print('\n\n')
P_a = 
[[0.  0.  0.  ... 0.  0.  0. ]
 [0.  0.  0.  ... 0.  0.  0. ]
 [0.  0.  0.  ... 0.  0.  0. ]
 ...
 [0.  0.  0.  ... 0.1 0.  0. ]
 [0.  0.  0.  ... 0.  0.1 0. ]
 [0.  0.  0.  ... 0.  0.  0.1]]

=====Iteration 1====
x = 
[10.548  5.476  0.     0.   ]

P = 
[[0.073 0.007 0.    0.   ]
 [0.007 0.073 0.    0.   ]
 [0.    0.    0.1   0.   ]
 [0.    0.    0.    0.1  ]]

  • Plotting State Space Uncertainties
viewer = create_viewer('State Space Uncertainties', 'x (m)', 'y (m)', xlim=(8,12), ylim=(4,6))

print(x_sigmas[:2, :5])

sigma_xlist = x_sigmas[0, :]
sigma_ylist = x_sigmas[1, :]

viewer.scatter(sigma_xlist, sigma_ylist, s=180, marker='x', c='blue', label='sigma points Xx from initial state')

visualize_estimate(viewer, 'initial state', 'g', x0, P0)
visualize_estimate(viewer, 'estimated state', 'r', x, P)

update_plotter()

[[10.         10.9486833  10.         10.         10.        ]
 [ 5.          5.31622777  5.89442719  5.          5.        ]]
UKF State Space Uncertainties
  • Tracking Target Trajectory
# generate trajectory

trajectory = [[10.0, 5.0], [11.0, 5.3], [12.0, 5.5], [13.0, 6.0], [14.0, 6.8], [15.0, 6.3], [16.0, 6.5]]
trajectory = np.asarray(trajectory)

traj_xlist = trajectory[:, 0]
traj_ylist = trajectory[:, 1]

print(traj_xlist)
print(traj_ylist)

measurements = []
for pose in trajectory:
    meas = RangeMeasurement(pose)
    measurements.append(meas.asArray())

measurements = np.asarray(measurements)



x0 = np.array([10., 5., 0., 0.])

P0 = np.array([[0.1, 0.0, 0.0, 0.0],
              [0.0, 0.1, 0.0, 0.0],
              [0.0, 0.0, 0.1, 0.0],
              [0.0, 0.0, 0.0, 0.1]])

Q = np.array([[0.05, 0.0, 0.0, 0.0],
              [0.0, 0.05, 0.0, 0.0],
              [0.0, 0.0, 0.1, 0.0],
              [0.0, 0.0, 0.0, 0.1]])

R = np.array([[0.1, 0.0],[0.0, 0.1]])

nx = np.shape(x0)[0]
nz = np.shape(R)[0]
nv = np.shape(x0)[0]
nn = np.shape(R)[0]

ukf = UKF(dim_x=nx, dim_z=nz, Q=Q, R=R, kappa=(3 - nx))

viewer = create_viewer('Tracking Target Trajectory', 'x (m)', 'y (m)', xlim=(8,20), ylim=(4,8))
viewer.scatter(traj_xlist, traj_ylist, s=80, marker='o', c='blue', label='actual target poses')

x, P = x0, P0

estimates = []

for iteration, z in enumerate(measurements):
    x, P, _ = ukf.predict(f_2, x, P)
    x, P, _ = ukf.correct(h_2, x, P, z)
    visualize_estimate(viewer, f'', 'g', x, P)
    
    estimates.append(x)

estimates = np.asarray(estimates).T
print(estimates)

estimates_px = estimates[0, :]
estimates_py = estimates[1, :]

estimates_vx = estimates[2, :]
estimates_vy = estimates[3, :]

viewer.plot(estimates_px, estimates_py)

update_plotter()
[10. 11. 12. ... 14. 15. 16.]
[5.  5.3 5.5 ... 6.8 6.3 6.5]
P_a = 
[[0.  0.  0.  ... 0.  0.  0. ]
 [0.  0.  0.  ... 0.  0.  0. ]
 [0.  0.  0.  ... 0.  0.  0. ]
 ...
 [0.  0.  0.  ... 0.1 0.  0. ]
 [0.  0.  0.  ... 0.  0.1 0. ]
 [0.  0.  0.  ... 0.  0.  0.1]]

[[ 9.99002177 10.75545459 11.81946368 ... 13.97015886 15.00954828
  15.99740642]
 [ 4.9949255   5.23829684  5.48105352 ...  6.68249639  6.47940462
   6.51869704]
 [-0.00399129  0.43554205  0.79264229 ...  1.01577438  1.0288903
   1.00587547]
 [-0.0020298   0.13822184  0.197749   ...  0.5713585   0.13736439
   0.08238877]]
Target tracking using Unscented Kalman filter (UKF)
  • Plotting the estimated x-,y-velocity profile from UKF
viewer = create_viewer('Velocity Profile', 'iteration', 'velocity (m/s)')
plt.rcParams.update({'font.size': 22})
viewer.plot(estimates_vx, color = 'blue', label='x_velocity')
viewer.plot(estimates_vy, color = 'red', label='y_velocity')
update_plotter()
Estimated velocity profile from Unscented Kalman Filter (UKF)

Industry Application in Dynamic Positioning System

  • In this section, we will choose a real life scenario by working onboard a ship with Dynamic Positioning (DP) system. Here, KF is used to in order to estimate a stable DP information from multiple sensors such as GPS, wind, current, IMU sensors, acoustic transponders, etc.
  • In order to simplify the problem, we use only position data (x and y axes) and follow these steps:
    • Defining the time period and the actual position of the ship.
    • Creating imaginary, noisy position measurements.
    • Calculating related KF matrices and saving calculated measurements after KF is applied.
    • Comparing actual data vs estimated positions.
#Kalman Filter in Dynamic Positioning System
import numpy as np
import pandas as pd
import math
import matplotlib.pyplot as plt
from numpy.linalg import inv

n_iters = 50

# Actual position of the ship
actual_x = 0
actual_y = 0

# Process noise covariance which is assumed to be a small value.
Q = np.full((2,2), 0.0001)
print('Q matrix: ', Q.shape, '\n\n', Q)
Q matrix:  (2, 2) 

 [[0.0001 0.0001]
 [0.0001 0.0001]]

# Create 50 random x and y position measurements
x_pos = np.random.normal(0, 0.5, n_iters)
y_pos = np.random.normal(0, 0.5, n_iters)

# Combine the measurements in a matrix
measurements = np.stack((x_pos, y_pos), axis=1).reshape((n_iters,2,1))

print('Measurements: ', measurements.shape, '\n\n', measurements[0:5], '\n', '...')
Measurements:  (50, 2, 1) 

 [[[-0.18828139]
  [ 0.41481874]]

 [[ 0.0424575 ]
  [ 0.32092219]]

 [[-0.26253112]
  [ 0.10642768]]

 [[-0.31451265]
  [-0.4752949 ]]

 [[ 0.81230049]
  [-0.4795513 ]]] 
 ...
# Measurement noise covariance.
R = np.diag([0.25, 0.25])
print('R matrix: ', R.shape, '\n\n', R)
R matrix:  (2, 2) 

 [[0.25 0.  ]
 [0.   0.25]]
# Creating empty matrices
x_hat = np.zeros((n_iters, 2,1))
P = np.zeros((n_iters,2,2))
x_hat_min = np.zeros((n_iters, 2,1))
P_min = np.zeros((n_iters,2,2))
K = np.zeros((n_iters,2,2))
# Initial state
x_hat[0] = [[0],[0]]

# Initial error covariance
P[0] = np.diag([1000.0, 1000.0])

print('x_hat[0]:\n\n', x_hat[0], '\n\n P[0] matrix: ', P[0].shape, '\n\n', P[0])
x_hat[0]:

 [[0.]
 [0.]] 

 P[0] matrix:  (2, 2) 

 [[1000.    0.]
 [   0. 1000.]]
# A matrix
A = np.array([[1.0, 0.0],
               [0.0, 1.0]])

# H matrix
H = np.eye(2)

# Unit matrix for Kalman gain
I = np.eye(2)

print('A matrix \n\n', A, '\n\n H matrix \n\n', H, '\n\n I matrix \n\n', I)
A matrix 

 [[1. 0.]
 [0. 1.]] 

 H matrix 

 [[1. 0.]
 [0. 1.]] 

 I matrix 

 [[1. 0.]
 [0. 1.]]
for k in range(1, n_iters):
    # Calculating a priori estimate and error covariance
    x_hat_min[k] = A.dot(x_hat[k-1])
    P_min[k] = A.dot(P[k-1]).dot(A.T) + Q
    
    # Calculating Kalman gain
    S = H.dot(P_min[k]).dot(H.T) + R
    K[k] = P_min[k].dot(H.T).dot(inv(S))
    
    # Calculating a posteriori estimate and error covariance
    x_hat[k] = x_hat_min[k] + K[k].dot(measurements[k] - H.dot(x_hat_min[k]))
    P[k] = (I - K[k]).dot(P_min[k])
  • Plotting Actual, Measured and Estimated Positions
plt.figure(figsize=(7,7))
for n in range(n_iters):
    plt.scatter(float(measurements[n][0]), float(measurements[n][1]), 
                color='orange', label='measured position', alpha=0.7)
    plt.scatter(float(x_hat[n][0]), float(x_hat[n][1]), 
                color='blue', label='kalman position', alpha=0.3)

plt.scatter(actual_x, actual_y, color='red', s=100, label='actual position')
plt.title('Actual, Measured and Estimated Positions')
plt.legend(['Measured', 'Estimated'], fontsize=14)
plt.xlabel('X axis')
plt.ylabel('Y axis');
Actual, Measured and Estimated Positions
  • Plotting Actual, Measured and Estimated Positions on X axis
plt.figure(figsize=(10,5))
for n in range(n_iters):
    plt.scatter(x = n, y = measurements[n][0], label='measurements', color='orange', alpha=0.7)
    plt.scatter(x = n, y = x_hat[n][0], label='estimate after kalman', color='blue', alpha=0.5)
plt.axhline(actual_x, color='r', label='actual x position')
plt.title('Actual, Measured and Estimated Positions on X axis')
plt.xlabel('Number of Steps')
plt.ylabel('X Position')
  • Plotting Actual, Measured and Estimated Positions on Y axis
plt.figure(figsize=(10,5))
for n in range(n_iters):
    plt.scatter(x = n, y = measurements[n][1], label='measurements', color='orange', alpha=0.7)
    plt.scatter(x = n, y = x_hat[n][1], label='estimate after kalman', color='blue', alpha=0.5)
plt.axhline(actual_x, color='r', label='actual x position')
plt.title('Actual, Measured and Estimated Positions on Y axis')
plt.xlabel('Number of Steps')
plt.ylabel('Y Position');

Actual, Measured and Estimated Positions on X, Y axes
  • Creating the following Plotly animation
# Creating a pandas dataframe for animation
x_position = []
y_position = []
data_type = []
n_steps = []

for n in range(n_iters):
    x_position.append(float(measurements[n][0]))
    y_position.append(float(measurements[n][1]))
    data_type.append('measured')
    n_steps.append(int(n))
    
for n in range(n_iters):
    x_position.append(float(x_hat[n][0]))
    y_position.append(float(x_hat[n][1]))
    data_type.append('estimated')
    n_steps.append(int(n))

df = pd.DataFrame({'x_position':x_position,
                   'y_position':y_position,
                   'data_type':data_type,
                   'n_steps':n_steps})

# Creating the animation
import plotly.express as px

fig = px.scatter(df, 
                 x ="x_position", 
                 y ="x_position", 
                 animation_frame ="n_steps",
                 animation_group ="data_type",
                 color ="data_type",
                 facet_col ="data_type",
                 range_x=[-3, 3], 
                 range_y=[-3, 3])
fig.show()

Animated measured vs estimated x- position with n_steps=5
Animated measured vs estimated x- position with n_steps=10
Animated measured vs estimated x- position with n_steps=15
Animated measured vs estimated x- position with n_steps=20
Animated measured vs estimated x- position with n_steps=25
Animated measured vs estimated x- position with n_steps=30
Animated measured vs estimated x- position with n_steps=35
Animated measured vs estimated x- position with n_steps=40
Animated measured vs estimated x- position with n_steps=45
Animated measured vs estimated x- position with n_steps=49
  • One can see that KF estimates converge near the actual position as the iterations progress.
  • With this basic DP example, we demonstrate how to use KF to process noisy DP data. Similarly, noisy wind, current and acoustic DP data can be filtered before usage.
  • Results can be generalized by defining mathematical model of the ship and adding velocity, acceleration, wind and current data to the related KF matrices.

Smoothed Position and Speed Estimates

  • Let’s invoke the class KalmanFilter to estimate a sinusoidal trajectory and speed
import numpy as np


class KalmanFilter:
    def __init__(self, x_0: np.ndarray, p: np.ndarray, phi: np.ndarray, gamma: np.ndarray, delta: np.ndarray,
                 cv: np.ndarray,
                 cw: np.ndarray):
        """
        Kalman filter
        :param x_0: Initial state vector
        :param p: Initial covariance matrix on x_0
        :param phi: State transition matrix
        :param gamma: Command model matrix
        :param delta: Sensors model matrix
        :param cv: Variance on step
        :param cw: Variance on sensors
        """
        self._x_hat = x_0
        self._num_states, _ = self._x_hat.shape
        self._p = p
        self._phi = phi
        self._gamma = gamma
        self._delta = delta
        self._cv = cv
        self._cw = cw

    def step(self, u: np.ndarray):
        """
        Step the Kalman filter
        :param u: command vector
        :return: None
        """
        self._x_hat = self._phi @ self._x_hat + self._gamma @ u
        self._p = self._phi @ self._p @ self._phi.T + self._cv

    def update(self, measure):
        """
        Update the Kalman filter with a measure
        :param measure: measurement made
        :return: None
        """
        z_hat = self._delta @ self._x_hat
        r = measure - z_hat
        k = self._p @ self._delta.T @ np.linalg.pinv(self._delta @ self._p @ self._delta.T + self._cw)
        self._x_hat = self._x_hat + k @ r
        self._p = (np.identity(self._num_states) - k @ self._delta) @ self._p

    @property
    def x_hat(self):
        return self._x_hat


if __name__ == '__main__':
    import matplotlib.pyplot as plt

    dt = 1
    x_0 = np.array([[0], [0]])
    p = np.array([[1, 0], [0, 1]]) * 1
    phi = np.array([[1, dt], [0, 1]])
    gamma = np.array([[0.5 * dt ** 2], [dt]])
    delta = np.array([[1, 0]])
    cv = np.array([[0, 0], [0, 0.1]])
    cw = np.array([[35 ** 2]])
    kf = KalmanFilter(x_0, p, phi, gamma, delta, cv, cw)

    true_position = 0
    true_speed = 0
    nb_steps = 200
    time = 0

    true_positions = []
    true_speeds = []
    measurements = []
    estimates = []

    for i in range(nb_steps):
        time += dt
        command = np.array([[0.5 * np.cos(time / (nb_steps * dt / 2) * 2 * np.pi)]])

        # update true localization
        noised_command = command + np.random.randn() * 0.2 * np.abs(np.linalg.norm(command))
        true_position = true_position + true_speed * dt + 0.5 * noised_command * dt ** 2
        true_speed = true_speed + noised_command * dt

        measurement = true_position + np.random.randn() * 25
        kf.step(command)
        kf.update(measurement)

        true_positions.append(true_position)
        true_speeds.append(true_speed)
        measurements.append(measurement)
        estimates.append(kf.x_hat)

    true_positions = np.array(true_positions).squeeze()
    true_speeds = np.array(true_speeds).squeeze()
    measurements = np.array(measurements).squeeze()
    estimates = np.array(estimates).squeeze()


    def moving_average(a, n=3):
        ret = np.cumsum(a, dtype=float)
        ret[n:] = ret[n:] - ret[:-n]
        return ret[n - 1:] / n


    def exponential_average(a, w):
        out = np.zeros_like(a)
        out[0] = a[0]
        for i in range(1, len(a)):
            out[i] = (1 - w) * out[i - 1] + w * a[i]
        return out


    m_average = moving_average(measurements, n=5)
    exp_average = exponential_average(measurements, 0.3)

  • Comparison with the true position, measurements vs estimated data by KF and moving/exponential average
   plt.plot(range(nb_steps), true_positions, label='True position')
    plt.scatter(range(nb_steps), measurements, marker='x', label='Measurements')
    plt.plot(range(nb_steps), estimates[:, 0], label='Kalman filter')
    plt.plot(range(len(m_average)), m_average, label='Moving average')
    plt.plot(range(len(exp_average)), exp_average, label='Exponential average')
    plt.title('Position estimate')
    plt.xlabel('Time')
    plt.ylabel('Position')
    plt.legend()
    plt.show()

    plt.plot(range(nb_steps), true_speeds, label='True speed')
    plt.plot(range(nb_steps), estimates[:, 1], label='Kalman filter')
    plt.title('Speed estimate')
    plt.xlabel('Time')
    plt.ylabel('Speed')
    plt.legend()
    plt.show()
Comparison with the true sinusoidal trajectory, measurements vs estimated data by Kalman Filter and moving/exponential average

Radar EKF Trajectory

  • Let’s apply the EKF approach to the noisy radar measurements
import numpy as np
from numpy.linalg import inv
import matplotlib.pyplot as plt

A, Q, R = None, None, None
x, P = None, None
firstRun = True

def Hjacob(xp):
    H = np.zeros([1,3])
    x1 = xp[0]
    x3 = xp[2]
    H[:,0] = x1 / np.sqrt(x1**2 + x3**2)
    H[:,1] = 0
    H[:,2] = x3 / np.sqrt(x1**2 + x3**2)
    return H

def hx(xhat):
    x1 = xhat[0]
    x3 = xhat[2]

    zp = np.sqrt(x1**2 + x3**2)
    return zp


z = radar distance
  = np.sqrt(pos**2 + alt**2) + v
def RadarEKF(z, dt):
    global firstRun
    global A, Q, R, x, P
    if firstRun:
        A = np.eye(3) + dt * np.array([[0,1,0],[0,0,0],[0,0,0]])
        Q = np.array([[0,0,0],[0,0.001,0],[0,0,0.001]])
        R = 10
        x = np.array([0,90,1100]).transpose()
        P = 10 * np.eye(3)
        firstRun = False
    else:
        H = Hjacob(x)
        X_pred = A @ x                          # X_pred : State Variable Prediction
        P_pred = A @ P @ A.T + Q                # Error Covariance Prediction

        K = (P_pred @ H.T) @ inv(H@P_pred@H.T + R) # K : Kalman Gain

        x = X_pred + K@(np.array([z - hx(X_pred)])) # Update State Variable Estimation
        P = P_pred - K@H@P_pred # Update Error Covariance Estimation

    pos = x[0]
    vel = x[1]
    alt = x[2]
    return pos, vel, alt

# ------ Test program

dt = 0.05
t = np.arange(0, 20, dt)
Nsamples = len(t)
Xsaved = np.zeros([Nsamples,3])
Zsaved = np.zeros([Nsamples,1])
Estimated = np.zeros([Nsamples,1])


pos_pred = None
def GetRadar(dt):
    global pos_pred
    if pos_pred == None:
        pos_pred = 0

    vel = 100 + 5*np.random.randn()
    pos = pos_pred + vel*dt

    alt = 1000 + 10*np.random.randn()
    v = 0 + pos*0.05*np.random.randn()
    r = np.sqrt(pos**2 + alt**2) + v

    pos_pred = pos
    return r


for k in range(Nsamples):
    r = GetRadar(dt)
    print(r)

    pos, vel, alt = RadarEKF(r, dt)

    Xsaved[k] = [pos, vel, alt]
    Zsaved[k] = r
    Estimated[k] = hx([pos, vel, alt])

PosSaved = Xsaved[:,0]
VelSaved = Xsaved[:,1]
AltSaved = Xsaved[:,2]

t = np.arange(0, Nsamples * dt ,dt)

fig = plt.figure()
plt.subplot(313)
plt.plot(t, PosSaved)
plt.xlabel('Time [Sec]')
plt.ylabel('Position [m]')

plt.subplot(311)
plt.plot(t, VelSaved)
plt.xlabel('Time [Sec]')
plt.ylabel('Speed [m/s]')

plt.subplot(312)
plt.plot(t, AltSaved)
plt.xlabel('Time [Sec]')
plt.ylabel('Altitude [m]')

plt.figure()
plt.plot(t, Zsaved, 'r--', label='Measured')
plt.plot(t, Estimated, 'b-', label='Estimated')
plt.xlabel('Time [Sec]')
plt.ylabel('Radar distance [m]')
plt.legend(loc='upper left')

plt.show()
EKF approach applied to noisy radar measurements: speed, altitude, position, and radar distance vs time.
  • The EKF is a special KF used when working with nonlinear systems. Since the regular KF can not be applied to nonlinear systems, the EKF was created to solve that problem.
  • This example confirms that the EKF does represent the true non-linear system state in the presence of strong measurement noise.

Conclusions

  • In this study, the path tracking based on KF and its extensions (like EKF and UKF) is analyzed in terms of the tracking accuracy and precision of hidden variables in the presence of uncertainty.
  • This article also presents the comprehensive error analysis of KF-based tracking models for both linear and non-linear noisy measurements.
  • As we saw in our scenario testing, KF can be used whenever we need to predict the next state of a system based on some noisy measurements.
  • The true and filtered responses of the proposed KF algorithms for object tracking are observed and verified.
  • Our simulations demonstrate the excellent robustness and computational efficiency of KF algorithms, making them suitable for real-time deployment.
  • Numerical results showed the efficacy of KF. Therefore, this study serves as a hands-on tutorial that helps engineers and data scientists who need a simple, fast, accurate and an easy-to-use implementation of KF for various industry applications.

Explore More

References


Go back

Your message has been sent

Warning

One-Time
Monthly
Yearly

Make a one-time donation

Make a monthly donation

Make a yearly donation

Choose an amount

€5.00
€15.00
€100.00
€5.00
€15.00
€100.00
€5.00
€15.00
€100.00

Or enter a custom amount


Your contribution is appreciated.

Your contribution is appreciated.

Your contribution is appreciated.

DonateDonate monthlyDonate yearly

Discover more from Our Blogs

Subscribe to get the latest posts sent to your email.

Leave a comment

Discover more from Our Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading