Skip to content

Commit

Permalink
Verify that timeshift trajectories are given correctly. Precalculate …
Browse files Browse the repository at this point in the history
…input trajectory derivatives. Add partials w.r.t timeshifts to instance constraints jacobian values func.
  • Loading branch information
chris-konrad committed Jan 3, 2025
1 parent 11ab07d commit 6f79482
Showing 1 changed file with 118 additions and 27 deletions.
145 changes: 118 additions & 27 deletions opty/direct_collocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,8 @@ def __init__(self, equations_of_motion, state_symbols,
self.tmp_dir = tmp_dir
self.parallel = parallel
self.show_compile_output = show_compile_output



self._sort_parameters()
self._check_known_trajectories()
Expand All @@ -734,11 +736,12 @@ def __init__(self, equations_of_motion, state_symbols,
self.num_collocation_nodes +
self.num_unknown_parameters +
int(self._variable_duration))

self.integration_method = integration_method

if instance_constraints is not None or self.num_timeshift_traj_substitutes > 0:
self._generate_timeshift_constraints()
self._precalc_timshift_input_derivatives()
self.num_instance_constraints = len(self.instance_constraints)
self._identify_functions_in_instance_constraints()
self._find_closest_free_index()
Expand Down Expand Up @@ -847,33 +850,56 @@ def _check_known_trajectories(self):


def _substitute_timeshift_trajectories(self):
"""Identify and substitute time-shifted trajectories in the eom. Creates a dictionary
linking the substitutes with the original trajectories and the timeshift parameters."""
"""Identify and substitute time-shifted trajectories in the eom. verifies that timeshift
trajectories are of type u(t+/-tau). Creates a dictionary linking the substitutes with the
original trajectories and the timeshift parameters.
"""

t_set = {self.time_symbol}

trajs = self.eom.atoms(sm.core.backend.AppliedUndef, sm.core.backend.Derivative)
trajs = self.eom.atoms(sm.Function)
self.timeshift_traj_substitutes = {}


for traj in trajs:
if not len(traj.free_symbols.intersection(t_set)) == 1:
continue
if not len(traj.free_symbols) == 2:
continue

traj_subs = sm.symbols(traj.name+"_shift", cls=sm.Function)(self.time_symbol)
timeshift_parameter = list(traj.free_symbols - t_set)[0]

self.timeshift_traj_substitutes[traj_subs] = (traj, timeshift_parameter)

self.eom = self.eom.subs(traj, traj_subs)
if not len(traj.free_symbols.intersection(t_set)) == 1:
raise ValueError((f"All functions must be a function of time {self.time_symbol}: "
f"{traj}"))

if traj.subs(timeshift_parameter, 0) not in self.known_trajectory_map:
raise ValueError((f'The unshifted original of an input trajectory with unknown'
f' timeshift has to be provided in the known trajectory map.'
f' The following trajectory is missing in known_trajectory_map:'
f' {traj.name}({self.time_symbol})'))
if len(traj.args) > 1:
raise ValueError(f"More then one argument per function is not supportet: {traj}")

if len(traj.free_symbols) == 2:

if not isinstance(traj.args[0], sm.core.add.Add):
raise ValueError(f"Function arguments must be the time variable or an addition"
f" of the time variable with a single constant offset: {traj}")

#find timeshift parameter
timeshift_param = list(traj.free_symbols - t_set)[0]

#check that timeshift original is known and calc its derivative
traj_general = traj.subs(traj.args[0], self.time_symbol)
if traj_general not in self.known_trajectory_map:
raise ValueError((f'The unshifted original of an input trajectory with unknown'
f' timeshift has to be provided in the known trajectory map.'
f' The following trajectory is missing in known_trajectory_'
f'map: {traj.name}({self.time_symbol})'))


#substitute timeshift trajectory
traj_subs = sm.symbols(traj.name+"_shift", cls=sm.Function)(self.time_symbol)
self.eom = self.eom.subs(traj, traj_subs)

#pack info into dict
self.timeshift_traj_substitutes[traj_subs] = [traj, timeshift_param]

else:
if len(traj.free_symbols) > 2:
raise ValueError(f"Only one single constant offset symbol per function is "
f"allowed: {traj}")

self.num_timeshift_traj_substitutes = len(self.timeshift_traj_substitutes)


Expand Down Expand Up @@ -1018,6 +1044,30 @@ def _generate_timeshift_constraints(self):

self.instance_constraints = tuple(list(self.instance_constraints) + timeshift_constraints)


def _precalc_timshift_input_derivatives(self):
"""Precalculates the derivatives of timeshifted input for the partials of the constraint
jacobian."""

def _diff(trajectory):
"""Differentiates a trajectory using midpoint or backward euler and edge value padding.
"""
if self.integration_method == 'midpoint':
padded = np.pad(trajectory, (1,1), mode='edge')
return (padded[2:] - padded[:2])/(2*self.node_time_interval)
elif self.integration_method == 'backward euler':
padded = np.pad(trajectory, (1,0), mode='edge')
return (padded[1:] - padded[:1])/(self.node_time_interval)

self.timeshift_traj_derivative_map = {}

for traj_subs in self.timeshift_traj_substitutes:
traj = self.timeshift_traj_substitutes[traj_subs][0]
traj_general = traj.subs(traj.args[0], self.time_symbol)
diff_vals = _diff(self.known_trajectory_map[traj_general])
self.timeshift_traj_derivative_map[traj_general] = diff_vals


def _identify_functions_in_instance_constraints(self):
"""Instantiates a set containing all of the instance functions, i.e.
x(1.0) in the instance constraints."""
Expand Down Expand Up @@ -1130,11 +1180,11 @@ def _instance_constraints_func(self):
#substitute known trajectory values
for func in subbed_constraints[i].atoms(sm.Function):
if func.name in unshifted_trajs:
arg = func.args[0]/self.node_time_interval
arg = (func.args[0]/self.node_time_interval)
for a in sm.preorder_traversal(arg):
if isinstance(a, sm.Float):
arg = arg.subs(a, a.round())
func_subs = unshifted_trajs[func.name][arg,0]
func_subs = unshifted_trajs[func.name][arg//1,0] #division by 1 to covert to int
subbed_constraints[i] = subbed_constraints[i].subs(func, func_subs)

#lambdify
Expand Down Expand Up @@ -1168,17 +1218,58 @@ def _instance_constraints_jacobian_indices(self):
def _instance_constraints_jacobian_values_func(self):
"""Returns the non-zero values of the constraint Jacobian associated
with the instance constraints."""

#create matrix symbols for the free vector and all timshift input trajectories
free = sm.MatrixSymbol('FREE', self.num_free, 1)

unshifted_trajs = {}
unshifted_traj_vals = []
for val in self.timeshift_traj_substitutes.values():
size = self.known_trajectory_map[val[0].subs(val[1],0)].size
unshifted_trajs[val[0].name] = sm.MatrixSymbol(val[0].name.upper(), size, 1)
unshifted_traj_vals.append(self.known_trajectory_map[val[0].subs(val[1],0)])

# make map from unknown parameters to their value in FREE
unknown_param_map = {}
n_traj_vals = (self.num_states + self.num_unknown_input_trajectories) \
* self.num_collocation_nodes
for v in self.unknown_parameters:
unknown_param_map[v] = free[n_traj_vals + self.unknown_parameters.index(v),0]

known_traj_set = [traj.name for traj in self.known_trajectory_map.keys()]
timeshift_param_set = set([v[1] for v in self.timeshift_traj_substitutes.values()])

funcs = []
num_vals_per_func = []
for con in self.instance_constraints:
partials = list(con.atoms(sm.Function))
num_vals_per_func.append(len(partials))
jac = sm.Matrix([con]).jacobian(partials)

#identify partials w.r.t unknown trajectories and timeshift parameters
traj_partials = []
timeshift_jac = []

for traj in list(con.atoms(sm.Function)):
if not traj.name in known_traj_set:
traj_partials.append(traj)
else:
time_idx = me.msubs(traj.args[0], unknown_param_map) // self.node_time_interval
timeshift_jac.append(
unshifted_trajs[traj.name][time_idx,0]/self.node_time_interval)

#calculate jacobian entries w.r.t unknown input trajectories.
traj_jac = sm.Matrix([con]).jacobian(traj_partials)

#concat jacobian
if len(timeshift_jac) > 0:
jac = traj_jac.row_join(sm.Matrix(timeshift_jac))
else:
jac = traj_jac


num_vals_per_func.append(len(jac))
jac = me.msubs(jac, self.instance_constraints_free_map)
funcs.append(sm.lambdify(([free] +
list(self.known_parameter_map.keys())),
funcs.append(sm.lambdify(([free] \
+ list(unshifted_trajs.values()) \
+ list(self.known_parameter_map.keys())),
jac, modules=[{'ImmutableMatrix':
np.array}, "numpy"]))
length = np.sum(num_vals_per_func)
Expand All @@ -1187,7 +1278,7 @@ def wrapped(free):
arr = np.zeros(length)
j = 0
for i, (f, num) in enumerate(zip(funcs, num_vals_per_func)):
arr[j:j + num] = f(free, *self.known_parameter_map.values())
arr[j:j + num] = f(free, *unshifted_traj_vals, *self.known_parameter_map.values())
j += num
return arr

Expand Down

0 comments on commit 6f79482

Please sign in to comment.