Das Kalman-Filter - Musterlösung

  1import numpy as np
  2import argparse
  3from SignalHub import Engine, Module, ConfigParser
  4from generator import MeasurementGenerator
  5from visualization import Visualization
  6from terminateafter import TerminateAfter
  7
  8class KalmanFilter(Module):
  9    def __init__(self):
 10        super().__init__(
 11            inputSignals=["measurements", "dt"],
 12            outputSchema={
 13                "type": "object",
 14                "properties": {
 15                    "prior": {
 16                        "type": "object",
 17                        "properties": {"state": {}, "covariance": {}},
 18                        "additionalProperties": False,
 19                    },
 20                    "posterior": {
 21                        "type": "object",
 22                        "properties": {"state": {}, "covariance": {}},
 23                        "additionalProperties": False,
 24                    },
 25                },
 26            },
 27        )
 28
 29        self.X, self.P = None, None
 30        self.prediction, self.priorCov = None, None
 31
 32    def predict(self, dt):
 33        """
 34        Predicts the next state and covariance of the system using the Kalman filter prediction step.
 35        This method applies the state transition model to estimate the next state vector (`self.X`)
 36        and updates the state covariance matrix (`self.P`) by incorporating process noise.
 37        The prediction is based on the following:
 38          - State transition matrix (F) models the system dynamics.
 39          - Process noise covariance matrix (Q) accounts for uncertainty in the process.
 40
 41        For a 2D position and velocity model, the state vector is represented as:
 42        [x_position, y_position, x_velocity, y_velocity]. The state transition matrix F 
 43        is designed to update the position based on the current velocity, assuming a constant
 44        velocity model. The process noise covariance matrix Q is set to a small value to reflect
 45        the uncertainty in the process.  
 46
 47        Assuming constant velocity, the state transition matrix F is defined as:
 48
 49            F = [[1, 0, dt, 0],
 50                [0, 1, 0, dt],
 51                [0, 0, 1, 0],
 52                [0, 0, 0, 1]]
 53
 54        where dt is the time step between predictions.
 55
 56        The process noise covariance matrix Q is defined as:  
 57
 58            Q = [[q_x, 0, 0, 0],
 59                [0, q_y, 0, 0],
 60                [0, 0, q_vx, 0],
 61                [0, 0, 0, q_vy]]
 62
 63        where q_x, q_y, q_vx, and q_vy are small values representing the process noise for position. 
 64
 65        Use np.diag to create a diagonal matrix for Q, where the diagonal elements represent the process noise variances for each state variable.^
 66        Use small noise for position and larger noise for velocity to reflect the uncertainty in the process.
 67        The prediction step updates the state vector and covariance matrix as follows:  
 68        
 69        - `X = F @ X`: This updates the state vector by applying the state transition matrix F.
 70        - `P = F @ P @ F.T + Q`: This updates the state covariance matrix by applying the state transition matrix F, its transpose, and adding the process noise covariance matrix Q.
 71        
 72        Updates:
 73          self.X: (4,1) Vector with the predicted state vector after applying the state transition.
 74          self.P: (4,4) Matrix with the predicted state covariance matrix after accounting for process noise.
 75        """
 76        
 77        # TODO: Define the state transition matrix F
 78        F = np.array(
 79            [
 80                [1.0, 0.0, dt, 0.0],
 81                [0.0, 1.0, 0.0, dt],
 82                [0.0, 0.0, 1.0, 0.0],
 83                [0.0, 0.0, 0.0, 1.0],
 84            ]
 85        )
 86
 87        # TODO: Define the process noise covariance matrix Q
 88        Q = np.diag([0.001, 0.001, 0.1, 0.1])
 89
 90        # TODO: Predict next state using the state transition matrix
 91        self.X = F @ self.X
 92
 93        # TODO: Update the state covariance matrix using the state transition matrix and process noise
 94        self.P = F @ self.P @ F.T + Q
 95
 96    def init_filter(self, z, R):
 97        """
 98        Initializes the Kalman filter with thje first measurement and covariance.
 99        
100        Parameters:
101        - z: (2,1) Vector with the first measurement (Position in X and Y).
102        - R: (2,2) Covariance matrix associated with the measurement, which represents the uncertainty in the measurement.
103        
104        This method sets the initial state and covariance for the Kalman filter, which will be used
105        in subsequent prediction and update steps.
106
107        The state vector `self.X` is initialized with the first measurement, assuming a 2D position and velocity model.
108        NOTE: The state vector is a 4D vector with the format [x_position, y_position, x_velocity, y_velocity].
109
110        The covariance matrix `self.P` is initialized with the measurement noise covariance `R` for position, 
111        and larger values for velocity to reflect the uncertainty in the initial state.
112        NOTE: The covariance matrix is a 4x4 matrix, where the diagonal elements represent the uncertainty in the position and velocity.
113        
114        Updates:
115          self.X: (4,1) Vector with the predicted state vector after applying the state transition.
116          self.P: (4,4) Matrix with the state covariance matrix
117        """
118        # TODO: Initialize the state vector with the initial measurement, assuming a 2D position and velocity model. 
119        # Use the X and Y component of the measurement z, and set the velocity components to 0.0.
120        # HINT: The resulting state vector should be a 4D vector with the format [x_position, y_position, x_velocity, y_velocity] with shape (4, 1).
121        self.X = np.array([[z[0][0], z[1][0], 0.0, 0.0]]).T
122
123        # TODO: Initialize the covariance matrix with the measurement noise covariance R for position,
124        # and larger values for velocity to reflect the uncertainty in the initial state. 
125        # HINT: The resulting covariance matrix should be a 4x4 matrix. You can use np.diag to create a diagonal matrix.        
126        self.P = np.diag([R[0][0], R[1][1], 100.0, 100.0])
127
128
129    def update(self, z, R):
130        """
131        Performs the Kalman filter update step with the given measurement.
132        Args:
133          z (np.ndarray): The measurement vector. Shape is (2,1) for 2D position (no velocity estimates from the measurement).
134          R (np.ndarray): The measurement noise covariance matrix. Shape is (2,2) for 2D position measurements.
135        Updates:
136          self.X (np.ndarray): The state estimate after incorporating the measurement (4,1) Vector.
137          self.P (np.ndarray): The state covariance matrix after the update. (4,4) Matrix.
138
139        If the filter is not initialized (self.X is None), initializes the filter with the measurement.
140        Otherwise, computes the Kalman gain, updates the state estimate and covariance matrix.
141
142        First, one needs to calculate the innovation (measurement residual) `y` and the innovation covariance `S`:
143        :math:`\begin{array}{ccc}
144                y &=& z - H\cdot X\\
145                S &=& H \cdot P \cdot H^T + R
146               \end{array}`
147        
148        where `z` is the measurement, `H` is the measurement matrix, `X` is the current state estimate, and `P` is the current covariance matrix.
149
150        Then, compute the Kalman gain `K`:
151        :math:`K = P \cdot H^T \cdot S^{-1}`
152
153        Finally, update the state estimate and covariance matrix:
154
155        :math:`X = X + K  \cdot y`
156        :math:`P = (I - K \cdot H) \cdot P`
157
158        where `H` is the measurement matrix and `I` is the identity matrix
159        """
160        # TODO: If the filter is not initialized (self.X is None), initialize it with the first measurement and return
161        if self.X is None:
162            self.init_filter(z, R)
163            return
164
165        ### TODO: Define the measurement matrix H
166        H = np.array([[1.0, 0.0, 0.0, 0.0], 
167                      [0.0, 1.0, 0.0, 0.0]])
168
169        ## TODO: Calculate the innovation (measurement residual) y and the innovation covariance S
170        y = z - H @ self.X
171        S = H @ self.P @ H.T + R
172
173        ## TODO: Compute the Kalman gain K
174        K = self.P @ H.T @ np.linalg.inv(S)
175
176        # TODO: Update the state estimate with the measurement
177        self.X = self.X + K @ y
178        self.P = (np.eye(4) - K @ H) @ self.P
179
180    def step(self, data):
181        # If there is a state already, predict the next state and covariance
182        # Also, make a copy of the current state and covariance for visualization of the prior 
183        if self.X is not None:
184            self.predict(data["dt"])
185            self.prediction = self.X.copy()
186            self.priorCov = self.P.copy()
187
188        # We iterate over all measurements in the data and update the filter with each measurement.
189        for _, measurement in data["measurements"].items():
190            # Extract the measurement state and covariance
191            z, R = measurement["state"], measurement["covariance"]
192
193            # If there is an actual measurenent, update the filter with the measurement
194            # Measurements may be missing, in which case z and R are None.
195            if z is not None and R is not None: 
196              self.update(z, R)
197
198            # If the filter has not been initialized yet, we set the prediction and priorCov to the current state and covariance
199            if self.prediction is None and self.priorCov is None:
200                self.prediction = self.X.copy()
201                self.priorCov = self.P.copy()
202
203        # Return the prior and posterior state and covariance
204        return {
205            "prior": {"state": self.prediction, "covariance": self.priorCov},
206            "posterior": {"state": self.X, "covariance": self.P},
207        }
208
209
210################################################# 
211# Main program to run the Kalman filter example #
212# Donot modify the code below this line        #
213#################################################
214
215parser = argparse.ArgumentParser("Example Program")
216parser.add_argument("--mode", action="store", default="none")
217parser.add_argument("--recorder.file", action="store")
218parser.add_argument("--engine.singlestep", action="store_true", default=False)
219parser.add_argument("--webcam.width", required=False)
220modules = [
221    ConfigParser(parser),
222    MeasurementGenerator(),
223    KalmanFilter(),
224    Visualization(),
225    TerminateAfter(),
226]
227
228engine = Engine(modules=modules, signals={})
229signals = engine.run({})