1import numpy as np
2import argparse
3from SignalHub import Engine, Module, ConfigParser
4from generator import MeasurementGenerator
5from visualization import Visualization
6from terminateafter import TerminateAfter
7
8class KalmanFilter(Module):
9 def __init__(self):
10 super().__init__(
11 inputSignals=["measurements", "dt"],
12 outputSchema={
13 "type": "object",
14 "properties": {
15 "prior": {
16 "type": "object",
17 "properties": {"state": {}, "covariance": {}},
18 "additionalProperties": False,
19 },
20 "posterior": {
21 "type": "object",
22 "properties": {"state": {}, "covariance": {}},
23 "additionalProperties": False,
24 },
25 },
26 },
27 )
28
29 self.X, self.P = None, None
30 self.prediction, self.priorCov = None, None
31
32 def predict(self, dt):
33 """
34 Predicts the next state and covariance of the system using the Kalman filter prediction step.
35 This method applies the state transition model to estimate the next state vector (`self.X`)
36 and updates the state covariance matrix (`self.P`) by incorporating process noise.
37 The prediction is based on the following:
38 - State transition matrix (F) models the system dynamics.
39 - Process noise covariance matrix (Q) accounts for uncertainty in the process.
40
41 For a 2D position and velocity model, the state vector is represented as:
42 [x_position, y_position, x_velocity, y_velocity]. The state transition matrix F
43 is designed to update the position based on the current velocity, assuming a constant
44 velocity model. The process noise covariance matrix Q is set to a small value to reflect
45 the uncertainty in the process.
46
47 Assuming constant velocity, the state transition matrix F is defined as:
48
49 F = [[1, 0, dt, 0],
50 [0, 1, 0, dt],
51 [0, 0, 1, 0],
52 [0, 0, 0, 1]]
53
54 where dt is the time step between predictions.
55
56 The process noise covariance matrix Q is defined as:
57
58 Q = [[q_x, 0, 0, 0],
59 [0, q_y, 0, 0],
60 [0, 0, q_vx, 0],
61 [0, 0, 0, q_vy]]
62
63 where q_x, q_y, q_vx, and q_vy are small values representing the process noise for position.
64
65 Use np.diag to create a diagonal matrix for Q, where the diagonal elements represent the process noise variances for each state variable.^
66 Use small noise for position and larger noise for velocity to reflect the uncertainty in the process.
67 The prediction step updates the state vector and covariance matrix as follows:
68
69 - `X = F @ X`: This updates the state vector by applying the state transition matrix F.
70 - `P = F @ P @ F.T + Q`: This updates the state covariance matrix by applying the state transition matrix F, its transpose, and adding the process noise covariance matrix Q.
71
72 Updates:
73 self.X: (4,1) Vector with the predicted state vector after applying the state transition.
74 self.P: (4,4) Matrix with the predicted state covariance matrix after accounting for process noise.
75 """
76
77 # TODO: Define the state transition matrix F
78 F = np.array(
79 [
80 [1.0, 0.0, dt, 0.0],
81 [0.0, 1.0, 0.0, dt],
82 [0.0, 0.0, 1.0, 0.0],
83 [0.0, 0.0, 0.0, 1.0],
84 ]
85 )
86
87 # TODO: Define the process noise covariance matrix Q
88 Q = np.diag([0.001, 0.001, 0.1, 0.1])
89
90 # TODO: Predict next state using the state transition matrix
91 self.X = F @ self.X
92
93 # TODO: Update the state covariance matrix using the state transition matrix and process noise
94 self.P = F @ self.P @ F.T + Q
95
96 def init_filter(self, z, R):
97 """
98 Initializes the Kalman filter with thje first measurement and covariance.
99
100 Parameters:
101 - z: (2,1) Vector with the first measurement (Position in X and Y).
102 - R: (2,2) Covariance matrix associated with the measurement, which represents the uncertainty in the measurement.
103
104 This method sets the initial state and covariance for the Kalman filter, which will be used
105 in subsequent prediction and update steps.
106
107 The state vector `self.X` is initialized with the first measurement, assuming a 2D position and velocity model.
108 NOTE: The state vector is a 4D vector with the format [x_position, y_position, x_velocity, y_velocity].
109
110 The covariance matrix `self.P` is initialized with the measurement noise covariance `R` for position,
111 and larger values for velocity to reflect the uncertainty in the initial state.
112 NOTE: The covariance matrix is a 4x4 matrix, where the diagonal elements represent the uncertainty in the position and velocity.
113
114 Updates:
115 self.X: (4,1) Vector with the predicted state vector after applying the state transition.
116 self.P: (4,4) Matrix with the state covariance matrix
117 """
118 # TODO: Initialize the state vector with the initial measurement, assuming a 2D position and velocity model.
119 # Use the X and Y component of the measurement z, and set the velocity components to 0.0.
120 # HINT: The resulting state vector should be a 4D vector with the format [x_position, y_position, x_velocity, y_velocity] with shape (4, 1).
121 self.X = np.array([[z[0][0], z[1][0], 0.0, 0.0]]).T
122
123 # TODO: Initialize the covariance matrix with the measurement noise covariance R for position,
124 # and larger values for velocity to reflect the uncertainty in the initial state.
125 # HINT: The resulting covariance matrix should be a 4x4 matrix. You can use np.diag to create a diagonal matrix.
126 self.P = np.diag([R[0][0], R[1][1], 100.0, 100.0])
127
128
129 def update(self, z, R):
130 """
131 Performs the Kalman filter update step with the given measurement.
132 Args:
133 z (np.ndarray): The measurement vector. Shape is (2,1) for 2D position (no velocity estimates from the measurement).
134 R (np.ndarray): The measurement noise covariance matrix. Shape is (2,2) for 2D position measurements.
135 Updates:
136 self.X (np.ndarray): The state estimate after incorporating the measurement (4,1) Vector.
137 self.P (np.ndarray): The state covariance matrix after the update. (4,4) Matrix.
138
139 If the filter is not initialized (self.X is None), initializes the filter with the measurement.
140 Otherwise, computes the Kalman gain, updates the state estimate and covariance matrix.
141
142 First, one needs to calculate the innovation (measurement residual) `y` and the innovation covariance `S`:
143 :math:`\begin{array}{ccc}
144 y &=& z - H\cdot X\\
145 S &=& H \cdot P \cdot H^T + R
146 \end{array}`
147
148 where `z` is the measurement, `H` is the measurement matrix, `X` is the current state estimate, and `P` is the current covariance matrix.
149
150 Then, compute the Kalman gain `K`:
151 :math:`K = P \cdot H^T \cdot S^{-1}`
152
153 Finally, update the state estimate and covariance matrix:
154
155 :math:`X = X + K \cdot y`
156 :math:`P = (I - K \cdot H) \cdot P`
157
158 where `H` is the measurement matrix and `I` is the identity matrix
159 """
160 # TODO: If the filter is not initialized (self.X is None), initialize it with the first measurement and return
161 if self.X is None:
162 self.init_filter(z, R)
163 return
164
165 ### TODO: Define the measurement matrix H
166 H = np.array([[1.0, 0.0, 0.0, 0.0],
167 [0.0, 1.0, 0.0, 0.0]])
168
169 ## TODO: Calculate the innovation (measurement residual) y and the innovation covariance S
170 y = z - H @ self.X
171 S = H @ self.P @ H.T + R
172
173 ## TODO: Compute the Kalman gain K
174 K = self.P @ H.T @ np.linalg.inv(S)
175
176 # TODO: Update the state estimate with the measurement
177 self.X = self.X + K @ y
178 self.P = (np.eye(4) - K @ H) @ self.P
179
180 def step(self, data):
181 # If there is a state already, predict the next state and covariance
182 # Also, make a copy of the current state and covariance for visualization of the prior
183 if self.X is not None:
184 self.predict(data["dt"])
185 self.prediction = self.X.copy()
186 self.priorCov = self.P.copy()
187
188 # We iterate over all measurements in the data and update the filter with each measurement.
189 for _, measurement in data["measurements"].items():
190 # Extract the measurement state and covariance
191 z, R = measurement["state"], measurement["covariance"]
192
193 # If there is an actual measurenent, update the filter with the measurement
194 # Measurements may be missing, in which case z and R are None.
195 if z is not None and R is not None:
196 self.update(z, R)
197
198 # If the filter has not been initialized yet, we set the prediction and priorCov to the current state and covariance
199 if self.prediction is None and self.priorCov is None:
200 self.prediction = self.X.copy()
201 self.priorCov = self.P.copy()
202
203 # Return the prior and posterior state and covariance
204 return {
205 "prior": {"state": self.prediction, "covariance": self.priorCov},
206 "posterior": {"state": self.X, "covariance": self.P},
207 }
208
209
210#################################################
211# Main program to run the Kalman filter example #
212# Donot modify the code below this line #
213#################################################
214
215parser = argparse.ArgumentParser("Example Program")
216parser.add_argument("--mode", action="store", default="none")
217parser.add_argument("--recorder.file", action="store")
218parser.add_argument("--engine.singlestep", action="store_true", default=False)
219parser.add_argument("--webcam.width", required=False)
220modules = [
221 ConfigParser(parser),
222 MeasurementGenerator(),
223 KalmanFilter(),
224 Visualization(),
225 TerminateAfter(),
226]
227
228engine = Engine(modules=modules, signals={})
229signals = engine.run({})