Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re-implement kalman filter with single sliding-window prediction #146

Merged
merged 10 commits into from
Oct 7, 2024
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ dependencies:
- pip
- pip:
- redis
- simdkalman
1 change: 1 addition & 0 deletions environment_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ dependencies:
- pip
- pip:
- redis
- simdkalman
271 changes: 170 additions & 101 deletions plugins/kalman/kalman_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,131 +6,200 @@
#
# Licensed under the NASA Open Source Agreement version 1.3
# See "NOSA GSC-19165-1 OnAIR.pdf"
"""
This module contains the Plugin class for detecting faults using Kalman filtering.

The Plugin class is derived from AIPlugin and uses a Kalman filter to predict
future values based on a sliding window of observations. It calculates residuals
between predicted and actual values to identify potential faults in the system.

The module is part of NASA's On-Board Artificial Intelligence Research (OnAIR) Platform.
"""
import simdkalman
import numpy as np
from onair.data_handling.parser_util import floatify_input
from onair.src.util.print_io import print_msg
from onair.src.ai_components.ai_plugin_abstract.ai_plugin import AIPlugin


class Plugin(AIPlugin):
def __init__(self, name, headers, window_size=3):
"""
A plugin for detecting faults using Kalman filtering.

This plugin uses a Kalman filter to predict future values based on a sliding window
of observations. It calculates residuals between predicted and actual values to
identify potential faults in the system.

The plugin maintains a separate Kalman filter for each data attribute and uses
a specified threshold to determine if a residual indicates a fault.

Attributes:
component_name (str): The name of the plugin component.
headers (list): List of descriptive header names for low_level_data.
window_size (int): Size of the sliding window for observations.
residual_threshold (float): Threshold above which a residual is considered a fault.
frames (list): List of lists containing the sliding window data for each attribute.
kf (simdkalman.KalmanFilter): The Kalman filter used for predictions.
"""

def __init__(self, name, headers, window_size=15, residual_threshold=1.5):
"""
:param headers: (int) length of time agent examines
:param window_size: (int) size of time window to examine
Initialize the Plugin class.

Parameters
----------
name : str
The name of the plugin.
headers : list
List of header names for the data attributes.
window_size : int, optional
Size of the time window to examine (default is 15).
residual_threshold : float, optional
Threshold of residual above which is considered a fault (default is 1.5).

Returns
-------
None
"""
if window_size < 3:
raise RuntimeError(
f"Kalman plugin unable to operate with window size < 3: given {window_size}"
)

super().__init__(name, headers)
self.frames = []
self.component_name = name
self.headers = headers
self.window_size = window_size
self.residual_threshold = residual_threshold
self.frames = [[] for _ in range(len(headers))]

self.kf = simdkalman.KalmanFilter(
state_transition=[[1, 1], [0, 1]], # matrix A
process_noise=np.diag([0.1, 0.01]), # Q
observation_model=np.array([[1, 0]]), # H
# matrix
# A
state_transition=[[1, 1], [0, 1]],
# Q
process_noise=np.diag([0.1, 0.1]),
# H
observation_model=np.array([[1, 0]]),
# R
observation_noise=1.0,
) # R
)

#### START: Classes mandated by plugin architecture
def update(self, frame):
def update(self, low_level_data=None, _high_level_data=None):
"""
:param frame: (list of floats) input sequence of len (input_dim)
:return: None
Update the frames with new low-level data.

This method converts the input data to float type, appends it to the
corresponding frames, and maintains the window size by removing older
data points if necessary.

Parameters
----------
low_level_data : list
Input sequence of data points with length equal header dimensions.

Returns
-------
None
"""
for data_point_index in range(len(frame)):
if len(self.frames) < len(
frame
): # If the frames variable is empty, append each data point in frame to it, each point wrapped as a list
# This is done so the data can have each attribute grouped in one list before being passed to kalman
# Ex: [[1:00, 1:01, 1:02, 1:03, 1:04, 1:05], [1, 2, 3, 4, 5]]
self.frames.append([frame[data_point_index]])
else:
self.frames[data_point_index].append(frame[data_point_index])
if (
len(self.frames[data_point_index]) > self.window_size
): # If after adding a point to the frame, that attribute is larger than the window_size, take out the first element
self.frames[data_point_index].pop(0)
if low_level_data is None:
print_msg(
"Kalman plugin requires low_level_data but received None.", ["FAIL"]
)
else:
frame = floatify_input(low_level_data)

for i, value in enumerate(frame):
self.frames[i].append(value)
if len(self.frames[i]) > self.window_size:
self.frames[i].pop(0)

def render_reasoning(self):
"""
System should return its diagnosis
Generate a list of attributes that show fault-like behavior based on residual analysis.

This method calculates residuals using the Kalman filter predictions and compares
them against a threshold. Attributes with residuals exceeding the threshold are
considered to show fault-like behavior, except for the 'TIME' attribute.

Returns
-------
list of str
A list of attribute names that show fault-like behavior (i.e., have residuals
above the threshold). The 'TIME' attribute is excluded from this list even
if its residual is above the threshold.

Notes
-----
The residual threshold is defined by the `residual_threshold` attribute of the class.
"""
broken_attributes = self.frame_diagnosis(self.frames, self.headers)
residuals = self._generate_residuals()
residuals_above_thresh = residuals > self.residual_threshold
broken_attributes = []
for attribute_index in range(len(self.frames)):
if (
residuals_above_thresh[attribute_index]
and not self.headers[attribute_index].upper() == "TIME"
):
broken_attributes.append(self.headers[attribute_index])
return broken_attributes

#### END: Classes mandated by plugin architecture

# Gets mean of values
def mean(self, values):
return sum(values) / len(values)

# Gets absolute value residual from actual and predicted value
def residual(self, predicted, actual):
return abs(float(actual) - float(predicted))

# Gets standard deviation of data
def std_dev(self, data):
return np.std(data)

# Takes in the kf being used, the data, how many prediction "steps" it will make, and an optional initial value
# Gives a prediction values based on given parameters
def predict(self, data, forward_steps, inital_val=None):
for i in range(len(data)):
data[i] = float(
data[i]
) # Makes sure all the data being worked with is a float
if inital_val != None:
smoothed = self.kf.smooth(
data, initial_value=[float(inital_val), 0]
) # If there's an initial value, smooth it along that value
def _predict(self, subframe, forward_steps, initial_val):
"""
Provide predicted future values using Kalman filter.

Parameters
----------
subframe : list of list of float
Data for Kalman filter prediction.
forward_steps : int
Number of forward predictions to make.
initial_val : list of float
Initial value for Kalman filter.

Returns
-------
predictions : object
Predicted values from the Kalman filter with fields for states and observations.
"""
self.kf.smooth(subframe, initial_value=initial_val)
predictions = self.kf.predict(subframe, forward_steps)
return predictions

def _generate_residuals(self):
"""
Predict last observation in frame based on all previous observations in frame.

This method uses a Kalman filter to predict the last observation in each frame
based on all previous observations. It then calculates the residuals as the
absolute difference between the predicted and actual last observations.

Returns
-------
residuals : numpy.ndarray
Residuals based on the difference between the last observation and
the Kalman filter-smoothed prediction. If the frame length is 2 or less,
returns an array of zeros.

Notes
-----
The current size of each frame must be greater than 2 for valid initial and last values,
and for the Kalman filter to have sufficient data for smoothing.
"""
if len(self.frames[0]) > 2:
# generate initial values for frame, use first value for each attribute
initial_val = np.zeros((len(self.frames), 2, 1))
for i, frame in enumerate(self.frames):
initial_val[i] = np.array([[frame[0], 0]]).transpose()
predictions = self._predict(
[data[1:-1] for data in self.frames], 1, initial_val
)
actual_next_obs = [data[-1] for data in self.frames]
pred_mean = [
pred for attr in predictions.observations.mean for pred in attr
]
residuals = np.abs(np.subtract(pred_mean, actual_next_obs))
else:
smoothed = self.kf.smooth(data) # If not, smooth it however you like
predicted = self.kf.predict(
data, forward_steps
) # Make a prediction on the smoothed data
return predicted

def predictions_for_given_data(self, data):
returned_data = []
initial_val = data[0]
for item in range(len(data) - 1):
predicted = self.predict(data[0 : item + 1], 1, initial_val)
actual_next_state = data[item + 1]
pred_mean = predicted.observations.mean
returned_data.append(pred_mean)
if len(returned_data) == 0: # If there's not enough data just set it to 0
returned_data.append(0)
return returned_data

# Get data, make predictions, and then find the errors for these predictions
def generate_residuals_for_given_data(self, data):
residuals = []
initial_val = data[0]
for item in range(len(data) - 1):
predicted = self.predict(data[0 : item + 1], 1, initial_val)
actual_next_state = data[item + 1]
pred_mean = predicted.observations.mean
residual_error = float(self.residual(pred_mean, actual_next_state))
residuals.append(residual_error)
if (
len(residuals) == 0
): # If there are no residuals because the data is of length 1, then just say residuals is equal to [0]
residuals.append(0)
# return residual of 0 for frames size less than or equal to 2
residuals = np.zeros((len(self.frames),))
return residuals

# Info: takes a chunk of data of n size. Walks through it and gets residual errors.
# Takes the mean of the errors and determines if they're too large overall in order to determine whether or not there's a chunk in said error.
def current_attribute_chunk_get_error(self, data):
residuals = self.generate_residuals_for_given_data(data)
mean_residuals = abs(self.mean(residuals))
if abs(mean_residuals) < 1.5:
return False
return True

def frame_diagnosis(self, frame, headers):
kal_broken_attributes = []
for attribute_index in range(len(frame)):
error = self.current_attribute_chunk_get_error(frame[attribute_index])
if error and not headers[attribute_index].upper() == "TIME":
kal_broken_attributes.append(headers[attribute_index])
return kal_broken_attributes
Loading
Loading