Skip to content

First draft next_state rewrite [Closes #528] #529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 27 additions & 33 deletions src/prog_models/prognostics_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from prog_models.utils import ProgressBar
from prog_models.utils import calc_error
from prog_models.utils.containers import DictLikeMatrixWrapper, InputContainer, OutputContainer
from prog_models.utils.next_state import euler_next_state, rk4_next_state, euler_next_state_wrapper, rk4_next_state_wrapper
from prog_models.utils.next_state import next_state_functions
from prog_models.utils.parameters import PrognosticsModelParameters
from prog_models.utils.serialization import CustomEncoder, custom_decoder
from prog_models.utils.size import getsizeof
Expand Down Expand Up @@ -44,6 +44,8 @@ class PrognosticsModel(ABC):
output (e.g., {'z1': 0.2, 'z2': 0.3}), or a function (z) -> z
measurement_noise_dist : Optional, str
distribution for :term:`measurement noise` (e.g., normal, uniform, triangular)
integration_method: Optional, str
Integration method used by next state in continuous models, e.g. 'rk4' or 'euler' (default: 'euler'). If the model is discrete, this parameter will return an error.

Additional parameters specific to the model

Expand Down Expand Up @@ -362,12 +364,7 @@ def next_state(self, x, u, dt: float):
"""
# Note: Default is to use the dx method (continuous model) - overwrite next_state for continuous
dx = self.dx(x, u)
if isinstance(x, DictLikeMatrixWrapper) and isinstance(dx, DictLikeMatrixWrapper):
return self.StateContainer(x.matrix + dx.matrix * dt)
elif isinstance(dx, dict) or isinstance(x, dict):
return self.StateContainer({key: x[key] + dx[key]*dt for key in dx.keys()})
else:
raise ValueError(f"ValueError: dx return must be of type StateContainer, was {type(dx)}")
return self.StateContainer({key: x[key] + dx[key]*dt for key in dx.keys()})

@property
def is_continuous(self):
Expand Down Expand Up @@ -584,7 +581,9 @@ def is_state_transition_model(self) -> bool:
Returns:
bool: if the model is a state transition model
"""
has_overridden_transition = type(self).next_state != PrognosticsModel.next_state or type(self).dx != PrognosticsModel.dx
has_default_next_state = type(self).next_state == PrognosticsModel.next_state
has_integrator_next_state = type(self).next_state in next_state_functions.values()
has_overridden_transition = not (has_default_next_state or has_integrator_next_state) or type(self).dx != PrognosticsModel.dx
return has_overridden_transition and len(self.states) > 0

@property
Expand Down Expand Up @@ -836,7 +835,6 @@ def simulate_to_threshold(self, future_loading_eqn: Callable = None, first_outpu
config = { # Defaults
't0': 0.0,
'dt': ('auto', 1.0),
'integration_method': 'euler',
'save_pts': [],
'save_freq': 10.0,
'horizon': 1e100, # Default horizon (in s), essentially inf
Expand Down Expand Up @@ -887,6 +885,9 @@ def simulate_to_threshold(self, future_loading_eqn: Callable = None, first_outpu
threshold_met_eqn = self.threshold_met
event_state = self.event_state
load_eqn = future_loading_eqn
next_state = self.next_state
apply_noise = self.apply_process_noise
apply_limits = self.apply_limits
progress = config['progress']

# Threshold Met Equations
Expand Down Expand Up @@ -986,6 +987,12 @@ def load_eqn(t, x):
u = future_loading_eqn(t, x)
return self.InputContainer(u)

if not isinstance(next_state(x.copy(), u, dt0), DictLikeMatrixWrapper):
# Wrapper around the next state equation
def next_state(x, u, dt):
x = self.next_state(x, u, dt)
return self.StateContainer(x)

if not isinstance(self.output(x), DictLikeMatrixWrapper):
# Wrapper around the output equation
def output(x):
Expand All @@ -1002,29 +1009,10 @@ def output(x):
simulate_progress = ProgressBar(100, "Progress")
last_percentage = 0

if config['integration_method'].lower() == 'rk4':
# Using RK4 Method
dx = self.dx

try:
dx(x, load_eqn(t, x))
except ProgModelException:
raise ProgModelException("dx(x, u) must be defined to use RK4 method")

apply_limits = self.apply_limits
apply_process_noise = self.apply_process_noise
StateContainer = self.StateContainer
if not isinstance(self.dx(x.copy(), u), DictLikeMatrixWrapper):
next_state = rk4_next_state
else:
next_state = rk4_next_state_wrapper
elif config['integration_method'].lower() == 'euler':
if not isinstance(self.next_state(x.copy(), u, dt0), DictLikeMatrixWrapper):
next_state = euler_next_state_wrapper
else:
next_state = euler_next_state
else:
raise ProgModelInputException(f"'integration_method' mode {config['integration_method']} not supported. Must be 'euler' or 'rk4'")
if 'integration_method' in config:
# Update integration method for the duration of the simulation
old_integration_method = self.parameters.get('integration_method', 'euler')
self.parameters['integration_method'] = config['integration_method']

while t < horizon:
dt = next_time(t, x)
Expand All @@ -1033,7 +1021,9 @@ def output(x):
# This is sometimes referred to as 'leapfrog integration'
u = load_eqn(t, x)
t = t + dt/2
x = next_state(self, x, u, dt)
x = next_state(x, u, dt)
x = apply_noise(x, dt)
x = apply_limits(x)

# Save if at appropriate time
if (t >= next_save):
Expand Down Expand Up @@ -1073,6 +1063,10 @@ def output(x):
else:
saved_outputs = SimResult(saved_times, saved_outputs, _copy=False)
saved_event_states = SimResult(saved_times, saved_event_states, _copy=False)

if 'integration_method' in config:
# Reset integration method
self.parameters['integration_method'] = old_integration_method

return self.SimulationResults(
saved_times,
Expand Down
96 changes: 15 additions & 81 deletions src/prog_models/utils/next_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,44 +27,10 @@ def euler_next_state(model, x, u, dt: float):
--------
PrognosticsModel.next_state
"""
# Calculate next state and add process noise
next_state = model.apply_process_noise(model.next_state(x, u, dt), dt)
dx = model.dx(x, u)
return model.StateContainer(
{key: x[key] + dx[key]*dt for key in dx.keys()})

# Apply Limits
return model.apply_limits(next_state)

def euler_next_state_wrapper(model, x, u, dt: float):
"""
State transition equation using simple euler integration: Calls next_state(), calculating the next state, and then adds noise and applies limits

Parameters
----------
x : StateContainer
state, with keys defined by model.states \n
e.g., x = m.StateContainer({'abc': 332.1, 'def': 221.003}) given states = ['abc', 'def']
u : InputContainer
Inputs, with keys defined by model.inputs \n
e.g., u = m.InputContainer({'i':3.2}) given inputs = ['i']
dt : float
Timestep size in seconds (≥ 0) \n
e.g., dt = 0.1

Returns
-------
x : StateContainer
Next state, with keys defined by model.states
e.g., x = m.StateContainer({'abc': 332.1, 'def': 221.003}) given states = ['abc', 'def']

See Also
--------
PrognosticsModel.next_state
"""
# Calculate next state and add process noise
next_state = model.StateContainer(model.next_state(x, u, dt))
next_state = model.apply_process_noise(next_state, dt)

# Apply Limits
return model.apply_limits(next_state)

def rk4_next_state(model, x, u, dt: float):
"""
Expand Down Expand Up @@ -93,54 +59,22 @@ def rk4_next_state(model, x, u, dt: float):
PrognosticsModel.next_state
"""
dx1 = model.StateContainer(model.dx(x, u))
x2 = x.matrix + dx1.matrix*dt/2
x2 = model.StateContainer(x.matrix + dx1.matrix*dt/2)
dx2 = model.dx(x2, u)

x3 = model.StateContainer({key: x[key] + dt*dx_i/2 for key, dx_i in dx2.items()})
x3 = model.StateContainer(
{key: x[key] + dt*dx_i/2 for key, dx_i in dx2.items()})
dx3 = model.dx(x3, u)

x4 = model.StateContainer({key: x[key] + dt*dx_i for key, dx_i in dx3.items()})
x4 = model.StateContainer(
{key: x[key] + dt*dx_i for key, dx_i in dx3.items()})
dx4 = model.dx(x4, u)

x = model.StateContainer({key: x[key] + dt/3*(dx1[key]/2 + dx2[key] + dx3[key] + dx4[key]/2) for key in dx1.keys()})
return model.apply_limits(model.apply_process_noise(x))

def rk4_next_state_wrapper(model, x, u, dt: float):
"""
State transition equation using rungekutta4 integration: Calls next_state(), calculating the next state, and then adds noise and applies limits

Parameters
----------
x : StateContainer
state, with keys defined by model.states \n
e.g., x = m.StateContainer({'abc': 332.1, 'def': 221.003}) given states = ['abc', 'def']
u : InputContainer
Inputs, with keys defined by model.inputs \n
e.g., u = m.InputContainer({'i':3.2}) given inputs = ['i']
dt : float
Timestep size in seconds (≥ 0) \n
e.g., dt = 0.1

Returns
-------
x : StateContainer
Next state, with keys defined by model.states
e.g., x = m.StateContainer({'abc': 332.1, 'def': 221.003}) given states = ['abc', 'def']

See Also
--------
PrognosticsModel.next_state
"""
dx1 = model.StateContainer(model.dx(x, u))

x2 = model.StateContainer({key: x[key] + dt*dx_i/2 for key, dx_i in dx1.items()})
dx2 = model.dx(x2, u)

x3 = model.StateContainer({key: x[key] + dt*dx_i/2 for key, dx_i in dx2.items()})
dx3 = model.dx(x3, u)

x4 = model.StateContainer({key: x[key] + dt*dx_i for key, dx_i in dx3.items()})
dx4 = model.dx(x4, u)
x = model.StateContainer(
{key: x[key] + dt/3*(dx1[key]/2 + dx2[key] + dx3[key] + dx4[key]/2) for key in dx1.keys()})
return x

x = model.StateContainer({key: x[key] + dt/3*(dx1[key]/2 + dx2[key] + dx3[key] + dx4[key]/2) for key in dx1.keys()})
return model.apply_limits(model.apply_process_noise(x))
next_state_functions = {
'euler': euler_next_state,
'rk4': rk4_next_state
}
13 changes: 13 additions & 0 deletions src/prog_models/utils/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import types
from typing import Callable

from prog_models.utils.next_state import next_state_functions
from prog_models.utils.noise_functions import measurement_noise_functions, process_noise_functions
from prog_models.utils.serialization import *
from prog_models.utils.size import getsizeof
Expand Down Expand Up @@ -91,6 +92,18 @@ def __setitem__(self, key: str, value: float, _copy: bool = False) -> None:
for callback in self.callbacks[key]:
changes = callback(self)
self.update(changes) # Merge in changes

# Handle setting integration_method. This will override the next_state method
if key == 'integration_method':
if self._m.is_discrete and self._m.is_state_transition_model:
raise ProgModelTypeError(
"Cannot set integration method for discrete model (where next_state is overridden)")
if value.lower() not in next_state_functions.keys():
raise ProgModelTypeError(
f"Unsupported integration method {value.lower()}")
self._m.next_state = types.MethodType(
next_state_functions[value.lower()],
self._m)

if key == 'process_noise' or key == 'process_noise_dist':
if callable(self['process_noise']): # Provided a function
Expand Down
51 changes: 51 additions & 0 deletions tests/test_base_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,57 @@ def next_state(self, x, u, dt):
**config)
self.assertAlmostEqual(times[-1], 5.0, 5)

def test_integration_type(self):
m_default_integration = LinearThrownObject(process_noise=0, measurement_noise=0)
m_rk4_integration = LinearThrownObject(integration_method='rk4', process_noise=0, measurement_noise=0)

# compare two models with different integration types
x_default = m_default_integration.initialize()
u_default = m_default_integration.InputContainer({})
x_default = m_default_integration.next_state(x_default, u_default, 0.1)

x_rk4 = m_rk4_integration.initialize()
u_rk4 = m_rk4_integration.InputContainer({})
x_rk4 = m_rk4_integration.next_state(x_rk4, u_rk4, 0.1)

# The two models should have close but different values for x
self.assertAlmostEqual(x_default['x'], x_rk4['x'], delta=0.1)
self.assertNotEqual(x_default['x'], x_rk4['x'])

# Velocity should be exactly the same (because it's linear)
self.assertEqual(x_default['v'], x_rk4['v'])

# Now, if we change the integrator for the default model, it should be the same as the rk4 model
m_default_integration.parameters['integration_method'] = 'rk4'
x_default = m_default_integration.initialize()
u_default = m_default_integration.InputContainer({})
x_default = m_default_integration.next_state(x_default, u_default, 0.1)

# Both velocity (v) and position (x) should be equal
# This is because we changed the integration method to rk4 for the default model
self.assertEqual(x_default['v'], x_rk4['v'])
self.assertEqual(x_default['x'], x_rk4['x'])

def test_integration_type_error(self):
with self.assertRaises(ProgModelTypeError):
# unsupported integration type
m = LinearThrownObject(integration_method='invalid')

with self.assertRaises(ProgModelTypeError):
# change integration type for a discrete model
m = ThrownObject(integration_method='rk4')

# Repeat with setting in params
m = LinearThrownObject()
with self.assertRaises(ProgModelTypeError):
# unsupported integration type
m.parameters['integration_method'] = 'invalid'

m = ThrownObject()
with self.assertRaises(ProgModelTypeError):
# change integration type for a discrete model
m.parameters['integration_method'] = 'rk4'

def test_size(self):
m = MockProgModel()
size = sys.getsizeof(m)
Expand Down