diff --git a/src/prog_models/prognostics_model.py b/src/prog_models/prognostics_model.py index 086edd75d..021166c68 100644 --- a/src/prog_models/prognostics_model.py +++ b/src/prog_models/prognostics_model.py @@ -16,7 +16,7 @@ from prog_models.utils import ProgressBar from prog_models.utils import calc_error from prog_models.utils.containers import DictLikeMatrixWrapper, InputContainer, OutputContainer -from prog_models.utils.next_state import euler_next_state, rk4_next_state, euler_next_state_wrapper, rk4_next_state_wrapper +from prog_models.utils.next_state import next_state_functions from prog_models.utils.parameters import PrognosticsModelParameters from prog_models.utils.serialization import CustomEncoder, custom_decoder from prog_models.utils.size import getsizeof @@ -44,6 +44,8 @@ class PrognosticsModel(ABC): output (e.g., {'z1': 0.2, 'z2': 0.3}), or a function (z) -> z measurement_noise_dist : Optional, str distribution for :term:`measurement noise` (e.g., normal, uniform, triangular) + integration_method: Optional, str + Integration method used by next state in continuous models, e.g. 'rk4' or 'euler' (default: 'euler'). If the model is discrete, this parameter will return an error. Additional parameters specific to the model @@ -362,12 +364,7 @@ def next_state(self, x, u, dt: float): """ # Note: Default is to use the dx method (continuous model) - overwrite next_state for continuous dx = self.dx(x, u) - if isinstance(x, DictLikeMatrixWrapper) and isinstance(dx, DictLikeMatrixWrapper): - return self.StateContainer(x.matrix + dx.matrix * dt) - elif isinstance(dx, dict) or isinstance(x, dict): - return self.StateContainer({key: x[key] + dx[key]*dt for key in dx.keys()}) - else: - raise ValueError(f"ValueError: dx return must be of type StateContainer, was {type(dx)}") + return self.StateContainer({key: x[key] + dx[key]*dt for key in dx.keys()}) @property def is_continuous(self): @@ -584,7 +581,9 @@ def is_state_transition_model(self) -> bool: Returns: bool: if the model is a state transition model """ - has_overridden_transition = type(self).next_state != PrognosticsModel.next_state or type(self).dx != PrognosticsModel.dx + has_default_next_state = type(self).next_state == PrognosticsModel.next_state + has_integrator_next_state = type(self).next_state in next_state_functions.values() + has_overridden_transition = not (has_default_next_state or has_integrator_next_state) or type(self).dx != PrognosticsModel.dx return has_overridden_transition and len(self.states) > 0 @property @@ -836,7 +835,6 @@ def simulate_to_threshold(self, future_loading_eqn: Callable = None, first_outpu config = { # Defaults 't0': 0.0, 'dt': ('auto', 1.0), - 'integration_method': 'euler', 'save_pts': [], 'save_freq': 10.0, 'horizon': 1e100, # Default horizon (in s), essentially inf @@ -887,6 +885,9 @@ def simulate_to_threshold(self, future_loading_eqn: Callable = None, first_outpu threshold_met_eqn = self.threshold_met event_state = self.event_state load_eqn = future_loading_eqn + next_state = self.next_state + apply_noise = self.apply_process_noise + apply_limits = self.apply_limits progress = config['progress'] # Threshold Met Equations @@ -986,6 +987,12 @@ def load_eqn(t, x): u = future_loading_eqn(t, x) return self.InputContainer(u) + if not isinstance(next_state(x.copy(), u, dt0), DictLikeMatrixWrapper): + # Wrapper around the next state equation + def next_state(x, u, dt): + x = self.next_state(x, u, dt) + return self.StateContainer(x) + if not isinstance(self.output(x), DictLikeMatrixWrapper): # Wrapper around the output equation def output(x): @@ -1002,29 +1009,10 @@ def output(x): simulate_progress = ProgressBar(100, "Progress") last_percentage = 0 - if config['integration_method'].lower() == 'rk4': - # Using RK4 Method - dx = self.dx - - try: - dx(x, load_eqn(t, x)) - except ProgModelException: - raise ProgModelException("dx(x, u) must be defined to use RK4 method") - - apply_limits = self.apply_limits - apply_process_noise = self.apply_process_noise - StateContainer = self.StateContainer - if not isinstance(self.dx(x.copy(), u), DictLikeMatrixWrapper): - next_state = rk4_next_state - else: - next_state = rk4_next_state_wrapper - elif config['integration_method'].lower() == 'euler': - if not isinstance(self.next_state(x.copy(), u, dt0), DictLikeMatrixWrapper): - next_state = euler_next_state_wrapper - else: - next_state = euler_next_state - else: - raise ProgModelInputException(f"'integration_method' mode {config['integration_method']} not supported. Must be 'euler' or 'rk4'") + if 'integration_method' in config: + # Update integration method for the duration of the simulation + old_integration_method = self.parameters.get('integration_method', 'euler') + self.parameters['integration_method'] = config['integration_method'] while t < horizon: dt = next_time(t, x) @@ -1033,7 +1021,9 @@ def output(x): # This is sometimes referred to as 'leapfrog integration' u = load_eqn(t, x) t = t + dt/2 - x = next_state(self, x, u, dt) + x = next_state(x, u, dt) + x = apply_noise(x, dt) + x = apply_limits(x) # Save if at appropriate time if (t >= next_save): @@ -1073,6 +1063,10 @@ def output(x): else: saved_outputs = SimResult(saved_times, saved_outputs, _copy=False) saved_event_states = SimResult(saved_times, saved_event_states, _copy=False) + + if 'integration_method' in config: + # Reset integration method + self.parameters['integration_method'] = old_integration_method return self.SimulationResults( saved_times, diff --git a/src/prog_models/utils/next_state.py b/src/prog_models/utils/next_state.py index baba77801..8affcb4f8 100644 --- a/src/prog_models/utils/next_state.py +++ b/src/prog_models/utils/next_state.py @@ -27,44 +27,10 @@ def euler_next_state(model, x, u, dt: float): -------- PrognosticsModel.next_state """ - # Calculate next state and add process noise - next_state = model.apply_process_noise(model.next_state(x, u, dt), dt) + dx = model.dx(x, u) + return model.StateContainer( + {key: x[key] + dx[key]*dt for key in dx.keys()}) - # Apply Limits - return model.apply_limits(next_state) - -def euler_next_state_wrapper(model, x, u, dt: float): - """ - State transition equation using simple euler integration: Calls next_state(), calculating the next state, and then adds noise and applies limits - - Parameters - ---------- - x : StateContainer - state, with keys defined by model.states \n - e.g., x = m.StateContainer({'abc': 332.1, 'def': 221.003}) given states = ['abc', 'def'] - u : InputContainer - Inputs, with keys defined by model.inputs \n - e.g., u = m.InputContainer({'i':3.2}) given inputs = ['i'] - dt : float - Timestep size in seconds (≥ 0) \n - e.g., dt = 0.1 - - Returns - ------- - x : StateContainer - Next state, with keys defined by model.states - e.g., x = m.StateContainer({'abc': 332.1, 'def': 221.003}) given states = ['abc', 'def'] - - See Also - -------- - PrognosticsModel.next_state - """ - # Calculate next state and add process noise - next_state = model.StateContainer(model.next_state(x, u, dt)) - next_state = model.apply_process_noise(next_state, dt) - - # Apply Limits - return model.apply_limits(next_state) def rk4_next_state(model, x, u, dt: float): """ @@ -93,54 +59,22 @@ def rk4_next_state(model, x, u, dt: float): PrognosticsModel.next_state """ dx1 = model.StateContainer(model.dx(x, u)) - x2 = x.matrix + dx1.matrix*dt/2 + x2 = model.StateContainer(x.matrix + dx1.matrix*dt/2) dx2 = model.dx(x2, u) - x3 = model.StateContainer({key: x[key] + dt*dx_i/2 for key, dx_i in dx2.items()}) + x3 = model.StateContainer( + {key: x[key] + dt*dx_i/2 for key, dx_i in dx2.items()}) dx3 = model.dx(x3, u) - x4 = model.StateContainer({key: x[key] + dt*dx_i for key, dx_i in dx3.items()}) + x4 = model.StateContainer( + {key: x[key] + dt*dx_i for key, dx_i in dx3.items()}) dx4 = model.dx(x4, u) - x = model.StateContainer({key: x[key] + dt/3*(dx1[key]/2 + dx2[key] + dx3[key] + dx4[key]/2) for key in dx1.keys()}) - return model.apply_limits(model.apply_process_noise(x)) - -def rk4_next_state_wrapper(model, x, u, dt: float): - """ - State transition equation using rungekutta4 integration: Calls next_state(), calculating the next state, and then adds noise and applies limits - - Parameters - ---------- - x : StateContainer - state, with keys defined by model.states \n - e.g., x = m.StateContainer({'abc': 332.1, 'def': 221.003}) given states = ['abc', 'def'] - u : InputContainer - Inputs, with keys defined by model.inputs \n - e.g., u = m.InputContainer({'i':3.2}) given inputs = ['i'] - dt : float - Timestep size in seconds (≥ 0) \n - e.g., dt = 0.1 - - Returns - ------- - x : StateContainer - Next state, with keys defined by model.states - e.g., x = m.StateContainer({'abc': 332.1, 'def': 221.003}) given states = ['abc', 'def'] - - See Also - -------- - PrognosticsModel.next_state - """ - dx1 = model.StateContainer(model.dx(x, u)) - - x2 = model.StateContainer({key: x[key] + dt*dx_i/2 for key, dx_i in dx1.items()}) - dx2 = model.dx(x2, u) - - x3 = model.StateContainer({key: x[key] + dt*dx_i/2 for key, dx_i in dx2.items()}) - dx3 = model.dx(x3, u) - - x4 = model.StateContainer({key: x[key] + dt*dx_i for key, dx_i in dx3.items()}) - dx4 = model.dx(x4, u) + x = model.StateContainer( + {key: x[key] + dt/3*(dx1[key]/2 + dx2[key] + dx3[key] + dx4[key]/2) for key in dx1.keys()}) + return x - x = model.StateContainer({key: x[key] + dt/3*(dx1[key]/2 + dx2[key] + dx3[key] + dx4[key]/2) for key in dx1.keys()}) - return model.apply_limits(model.apply_process_noise(x)) +next_state_functions = { + 'euler': euler_next_state, + 'rk4': rk4_next_state +} diff --git a/src/prog_models/utils/parameters.py b/src/prog_models/utils/parameters.py index 262543398..236e4b313 100644 --- a/src/prog_models/utils/parameters.py +++ b/src/prog_models/utils/parameters.py @@ -9,6 +9,7 @@ import types from typing import Callable +from prog_models.utils.next_state import next_state_functions from prog_models.utils.noise_functions import measurement_noise_functions, process_noise_functions from prog_models.utils.serialization import * from prog_models.utils.size import getsizeof @@ -91,6 +92,18 @@ def __setitem__(self, key: str, value: float, _copy: bool = False) -> None: for callback in self.callbacks[key]: changes = callback(self) self.update(changes) # Merge in changes + + # Handle setting integration_method. This will override the next_state method + if key == 'integration_method': + if self._m.is_discrete and self._m.is_state_transition_model: + raise ProgModelTypeError( + "Cannot set integration method for discrete model (where next_state is overridden)") + if value.lower() not in next_state_functions.keys(): + raise ProgModelTypeError( + f"Unsupported integration method {value.lower()}") + self._m.next_state = types.MethodType( + next_state_functions[value.lower()], + self._m) if key == 'process_noise' or key == 'process_noise_dist': if callable(self['process_noise']): # Provided a function diff --git a/tests/test_base_models.py b/tests/test_base_models.py index 62ac82fdf..76404a9bb 100644 --- a/tests/test_base_models.py +++ b/tests/test_base_models.py @@ -128,6 +128,57 @@ def next_state(self, x, u, dt): **config) self.assertAlmostEqual(times[-1], 5.0, 5) + def test_integration_type(self): + m_default_integration = LinearThrownObject(process_noise=0, measurement_noise=0) + m_rk4_integration = LinearThrownObject(integration_method='rk4', process_noise=0, measurement_noise=0) + + # compare two models with different integration types + x_default = m_default_integration.initialize() + u_default = m_default_integration.InputContainer({}) + x_default = m_default_integration.next_state(x_default, u_default, 0.1) + + x_rk4 = m_rk4_integration.initialize() + u_rk4 = m_rk4_integration.InputContainer({}) + x_rk4 = m_rk4_integration.next_state(x_rk4, u_rk4, 0.1) + + # The two models should have close but different values for x + self.assertAlmostEqual(x_default['x'], x_rk4['x'], delta=0.1) + self.assertNotEqual(x_default['x'], x_rk4['x']) + + # Velocity should be exactly the same (because it's linear) + self.assertEqual(x_default['v'], x_rk4['v']) + + # Now, if we change the integrator for the default model, it should be the same as the rk4 model + m_default_integration.parameters['integration_method'] = 'rk4' + x_default = m_default_integration.initialize() + u_default = m_default_integration.InputContainer({}) + x_default = m_default_integration.next_state(x_default, u_default, 0.1) + + # Both velocity (v) and position (x) should be equal + # This is because we changed the integration method to rk4 for the default model + self.assertEqual(x_default['v'], x_rk4['v']) + self.assertEqual(x_default['x'], x_rk4['x']) + + def test_integration_type_error(self): + with self.assertRaises(ProgModelTypeError): + # unsupported integration type + m = LinearThrownObject(integration_method='invalid') + + with self.assertRaises(ProgModelTypeError): + # change integration type for a discrete model + m = ThrownObject(integration_method='rk4') + + # Repeat with setting in params + m = LinearThrownObject() + with self.assertRaises(ProgModelTypeError): + # unsupported integration type + m.parameters['integration_method'] = 'invalid' + + m = ThrownObject() + with self.assertRaises(ProgModelTypeError): + # change integration type for a discrete model + m.parameters['integration_method'] = 'rk4' + def test_size(self): m = MockProgModel() size = sys.getsizeof(m)