Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csm time interpolation range #476

Merged
merged 15 commits into from
Aug 18, 2021
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
- move `sine` utility function to `weldx.welding.util` [[#439]](https://github.com/BAMWelDX/weldx/pull/439)
- `LocalCoordinateSystem` and `CoordinateSystemManager` function parameters related to time now support all types that
are also supported by the new `Time` class [[#448]](https://github.com/BAMWelDX/weldx/pull/448)
- `LocalCoordinateSystem.interp_time` returns static systems if only a single time value is passed or if there is no
overlap between the interpolation time range and the coordinate systems time range. This also affects the results of
some `CoordinateSystemManager` methods (``get_cs``
, ``interp_time``) [[#476]](https://github.com/BAMWelDX/weldx/pull/476)

### fixes

Expand Down
169 changes: 155 additions & 14 deletions weldx/tests/test_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import math
import random
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Union
from typing import Any, Dict, List, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -851,17 +851,17 @@ def test_reset_reference_time_exceptions(
[
( # broadcast left
TS("2020-02-10"),
TDI([1, 2], "D"),
TDI([1, 2, 14], "D"),
TS("2020-02-10"),
r_mat_z([0, 0]),
np.array([[2, 8, 7], [2, 8, 7]]),
r_mat_z([0, 0, 0.5]),
np.array([[2, 8, 7], [2, 8, 7], [4, 9, 2]]),
),
( # broadcast right
TS("2020-02-10"),
TDI([29, 30], "D"),
TDI([14, 29, 30], "D"),
TS("2020-02-10"),
r_mat_z([0.5, 0.5]),
np.array([[3, 1, 2], [3, 1, 2]]),
r_mat_z([0.5, 0.5, 0.5]),
np.array([[4, 9, 2], [3, 1, 2], [3, 1, 2]]),
),
( # pure interpolation
TS("2020-02-10"),
Expand Down Expand Up @@ -934,6 +934,93 @@ def test_interp_time_discrete(
lcs_interp_like, orientation_exp, coordinates_exp, True, time, time_ref
)

# test_interp_time_discrete_outside_value_range ------------------------------------

@staticmethod
@pytest.mark.parametrize("time_dep_coords", [True, False])
@pytest.mark.parametrize("time_dep_orient", [True, False])
@pytest.mark.parametrize("all_less", [True, False])
def test_issue_289_interp_outside_time_range(
time_dep_orient: bool, time_dep_coords: bool, all_less: bool
):
"""Test if ``interp_time`` if all interp. values are outside the value range.

In this case it should always return a static system.

Parameters
----------
time_dep_orient :
If `True`, the orientation is time dependent
time_dep_coords :
If `True`, the coordinates are time dependent
all_less :
If `True`, all interpolation values are less than the time values of the
LCS. Otherwise, all values are greater.

"""
angles = [45, 135] if time_dep_orient else 135
orientation = WXRotation.from_euler("x", angles, degrees=True).as_matrix()
coordinates = [[0, 0, 0], [1, 1, 1]] if time_dep_coords else [1, 1, 1]
if time_dep_coords or time_dep_orient:
time = ["5s", "6s"] if all_less else ["0s", "1s"]
else:
time = None

lcs = LCS(orientation, coordinates, time)
lcs_interp = lcs.interp_time(["2s", "3s", "4s"])

exp_angle = 45 if time_dep_orient and all_less else 135
exp_orient = WXRotation.from_euler("x", exp_angle, degrees=True).as_matrix()
exp_coords = [0, 0, 0] if time_dep_coords and all_less else [1, 1, 1]

assert lcs_interp.time is None
assert lcs_interp.coordinates.values.shape == (3,)
assert lcs_interp.orientation.values.shape == (3, 3)
assert np.all(lcs_interp.coordinates.data == exp_coords)
assert np.all(lcs_interp.orientation.data == exp_orient)

# test_interp_time_discrete_single_time --------------------------------------------

@staticmethod
def test_interp_time_discrete_single_time():
"""Test that single value interpolation results in a static system."""
orientation = WXRotation.from_euler("x", [45, 135], degrees=True).as_matrix()
coordinates = [[0, 0, 0], [2, 2, 2]]
time = ["1s", "3s"]
lcs = LCS(orientation, coordinates, time)

exp_coords = [1, 1, 1]
exp_orient = WXRotation.from_euler("x", 90, degrees=True).as_matrix()

lcs_interp = lcs.interp_time("2s")
assert lcs_interp.time is None
assert lcs_interp.coordinates.values.shape == (3,)
assert lcs_interp.orientation.values.shape == (3, 3)
assert np.all(lcs_interp.coordinates.data == exp_coords)
assert np.allclose(lcs_interp.orientation.data, exp_orient)

# test_interp_time_discrete_outside_value_range_both_sides -------------------------

@staticmethod
def test_interp_time_discrete_outside_value_range_both_sides():
"""Test the interpolation is all values are outside of the LCS time range.

In this special case there is an overlap of the time ranges and we need to
ensure that the algorithm does not create a static system as it should if there
is no overlap.

"""
orientation = WXRotation.from_euler("x", [45, 135], degrees=True).as_matrix()
coordinates = [[0, 0, 0], [2, 2, 2]]
time = ["2s", "3s"]
lcs = LCS(orientation, coordinates, time)

lcs_interp = lcs.interp_time(["1s", "4s"])

assert np.all(lcs_interp.time == ["1s", "4s"])
assert np.all(lcs_interp.coordinates.data == lcs.coordinates.data)
assert np.allclose(lcs_interp.orientation.data, lcs.orientation.data)

# test_interp_time_timeseries_as_coords --------------------------------------------

@staticmethod
Expand Down Expand Up @@ -2994,7 +3081,7 @@ def test_get_local_coordinate_system_no_time_dep(
["2000-03-08", "2000-03-04", "2000-03-10", "2000-03-16"],
r_mat_x([0]),
[[1, 0, 0]],
([20], "2000-03-08"),
(None, None),
False,
),
# get transformed cs at specific times using a DatetimeIndex - all systems,
Expand Down Expand Up @@ -3047,7 +3134,7 @@ def test_get_local_coordinate_system_no_time_dep(
["2000-03-08", "2000-03-04", "2000-03-10", "2000-03-16"],
r_mat_x([0]),
[[1, 0, 0]],
([-4], "2000-03-08"),
(None, None),
False,
),
# get transformed cs at specific times using a DatetimeIndex - all systems
Expand Down Expand Up @@ -4069,15 +4156,17 @@ def _orientation_from_value(val, clip_min=None, clip_max=None):
angles = np.clip(val, clip_min, clip_max)
else:
angles = val
if len(angles) == 1:
angles = angles[0]
return WXRotation.from_euler("z", angles, degrees=True).as_matrix()

@staticmethod
def _coordinates_from_value(val, clip_min=None, clip_max=None):
if clip_min is not None and clip_max is not None:
val = np.clip(val, clip_min, clip_max)
if not isinstance(val, Iterable):
val = [val]
return [[v, 2 * v, -v] for v in val]
if len(val) > 1:
return [[v, 2 * v, -v] for v in val]
return [val[0], 2 * val[0], -val[0]]

@pytest.mark.parametrize(
"time, time_ref, systems, csm_has_time_ref, num_abs_systems",
Expand Down Expand Up @@ -4159,6 +4248,7 @@ def test_interp_time(
csm_interp = csm.interp_time(time, time_ref, systems)

# evaluate results
time_exp = time_class if len(time_class) > 1 else None
time_ref_exp = time_class.reference_time
for k, v in lcs_data.items():
# create expected lcs
Expand All @@ -4174,7 +4264,7 @@ def test_interp_time(
lcs_exp = tf.LocalCoordinateSystem(
self._orientation_from_value(days_interp + diff, v[1][0], v[1][-1]),
self._coordinates_from_value(days_interp + diff, v[1][0], v[1][-1]),
time_class,
time_exp,
csm.reference_time if csm.has_reference_time else time_ref_exp,
)
else:
Expand All @@ -4189,7 +4279,58 @@ def test_interp_time(

# check time union
if systems is None or len(systems) == 3:
assert np.all(csm_interp.time_union() == time_class.as_pandas())
assert np.all(csm_interp.time_union() == time_exp)

# issue 289 ------------------------------------------------------------------------

@staticmethod
@pytest.mark.parametrize("time_dep_coords", [True, False])
@pytest.mark.parametrize("time_dep_orient", [True, False])
@pytest.mark.parametrize("all_less", [True, False])
def test_issue_289_interp_outside_time_range(
time_dep_orient: bool, time_dep_coords: bool, all_less: bool
):
"""Test if ``get_cs`` behaves as described in pull request #289.

The requirement is that a static system is returned when all time values of the
interpolation are outside of the value range of the involved coordinate systems.

Parameters
----------
time_dep_orient :
If `True`, the orientation is time dependent
time_dep_coords :
If `True`, the coordinates are time dependent
all_less :
If `True`, all interpolation values are less than the time values of the
LCS. Otherwise, all values are greater.

"""
angles = [45, 135] if time_dep_orient else 135
orientation = WXRotation.from_euler("x", angles, degrees=True).as_matrix()
coordinates = [[0, 0, 0], [1, 1, 1]] if time_dep_coords else [1, 1, 1]
if time_dep_coords or time_dep_orient:
time = ["5s", "6s"] if all_less else ["0s", "1s"]
else:
time = None

csm = CSM("R")
# add A as time dependent in base
csm.create_cs("A", "R", orientation, coordinates, time)
# add B as static in A
csm.create_cs("B", "A")

cs_br = csm.get_cs("B", "R", time=["2s", "3s", "4s"])

exp_angle = 45 if time_dep_orient and all_less else 135
exp_orient = WXRotation.from_euler("x", exp_angle, degrees=True).as_matrix()
exp_coords = [0, 0, 0] if time_dep_coords and all_less else [1, 1, 1]

assert cs_br.time is None
assert cs_br.coordinates.values.shape == (3,)
assert cs_br.orientation.values.shape == (3, 3)
assert np.all(cs_br.coordinates.data == exp_coords)
assert np.all(cs_br.orientation.data == exp_orient)


def test_relabel():
Expand Down
5 changes: 4 additions & 1 deletion weldx/transformations/cs_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,10 @@ def get_cs(
reference.

If any coordinate system that is involved in the coordinate transformation has
a time dependency, the returned coordinate system will also be time dependent.
a time dependency, the returned coordinate system will most likely be also time
dependent. This won't be the case if only a single time value is passed or if
the passed time values do not overlap with any of the time dependent coordinate
systems' time ranges.

The timestamps of the returned system depend on the functions time parameter.
By default, the time union of all involved coordinate systems is taken.
Expand Down
68 changes: 56 additions & 12 deletions weldx/transformations/local_cs.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,11 @@ def is_time_dependent(self) -> bool:
"""
return self.time is not None or self._coord_ts is not None

@property
def has_timeseries(self) -> bool:
"""Return `True` if the coordinate system has a `TimeSeries` component."""
return self._coord_ts is not None
CagtayFabry marked this conversation as resolved.
Show resolved Hide resolved

@property
def has_reference_time(self) -> bool:
"""Return `True` if the coordinate system has a reference time.
Expand Down Expand Up @@ -652,18 +657,62 @@ def as_rotation(self) -> Rot: # pragma: no cover
"""
return Rot.from_matrix(self.orientation.values)

def _interp_time_orientation(self, time: Time) -> Union[xr.DataArray, np.ndarray]:
"""Interpolate the orientation in time."""
if self.orientation.ndim == 2: # don't interpolate static
orientation = self.orientation
elif time.max() <= self.time.min(): # only use edge timestamp
orientation = self.orientation.values[0]
elif time.min() >= self.time.max(): # only use edge timestamp
orientation = self.orientation.values[-1]
else: # full interpolation with overlapping times
orientation = ut.xr_interp_orientation_in_time(self.orientation, time)

if len(orientation) == 1: # remove "time dimension" for single value cases
return orientation[0].values
return orientation

def _interp_time_coordinates(self, time: Time) -> Union[xr.DataArray, np.ndarray]:
"""Interpolate the coordinates in time."""
if isinstance(self.coordinates, TimeSeries):
time_interp = Time(time, self.reference_time)
coordinates = self._coords_from_discrete_time_series(
self.coordinates.interp_time(time_interp)
)
if self.has_reference_time:
coordinates.weldx.time_ref = self.reference_time
elif self.coordinates.ndim == 1: # don't interpolate static
coordinates = self.coordinates
elif time.max() <= self.time.min(): # only use edge timestamp
coordinates = self.coordinates.values[0]
elif time.min() >= self.time.max(): # only use edge timestamp
coordinates = self.coordinates.values[-1]
else: # full interpolation with overlapping times
coordinates = ut.xr_interp_coordinates_in_time(self.coordinates, time)

if len(coordinates) == 1: # remove "time dimension" for single value cases
return coordinates[0].values
return coordinates

def interp_time(
self,
time: types_time_like,
time_ref: types_timestamp_like = None,
) -> LocalCoordinateSystem:
"""Interpolates the data in time.

Note that the returned system won't be time dependent anymore if only a single
time value was passed. The resulting system is constant and the passed time
value will be stripped from the result.
Additionally, if the passed time range does not overlap with the time range of
the coordinate system, the resulting system won't be time dependent neither
because the values outside of the coordinate systems time range are considered
as being constant.

Parameters
----------
time :
Series of times.
If passing "None" no interpolation will be performed.
Target time values. If `None` is passed, no interpolation will be performed.
time_ref:
The reference timestamp

Expand All @@ -687,17 +736,12 @@ def interp_time(
"allowed. Also check that the reference time has the correct type."
)

orientation = ut.xr_interp_orientation_in_time(self.orientation, time)
orientation = self._interp_time_orientation(time)
coordinates = self._interp_time_coordinates(time)

if isinstance(self.coordinates, TimeSeries):
time_interp = Time(time, self.reference_time)
coordinates = self._coords_from_discrete_time_series(
self.coordinates.interp_time(time_interp)
)
if self.has_reference_time:
coordinates.weldx.time_ref = self.reference_time
else:
coordinates = ut.xr_interp_coordinates_in_time(self.coordinates, time)
# remove time if orientations and coordinates are single values (static)
if orientation.ndim == 2 and coordinates.ndim == 1:
time = None

return LocalCoordinateSystem(orientation, coordinates, time)

Expand Down