Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

346 TimeSeries interp_time #353

Merged
merged 20 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
- `get_yaml_header` received a new option parse, which optionally returns the parsed YAML header
as `asdf.tagged.TaggedDict`. [[#338]](https://github.com/BAMWelDX/weldx/pull/338)
- refactor `asdf_json_repr` into `view_tree` [[#339]](https://github.com/BAMWelDX/weldx/pull/339)
- `TimeSeries.interp_time` [[#353]](https://github.com/BAMWelDX/weldx/pull/353)
- now returns a new `TimeSeries` instead of a `xarray.DataArray`
- if the data has already been interpolated before, a warning is emitted

### ASDF

Expand Down
123 changes: 80 additions & 43 deletions weldx/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Union
from warnings import warn

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -276,6 +277,7 @@ def __init__(
self._time_var_name = None
self._shape = None
self._units = None
self._interp_counter = 0

if isinstance(data, pint.Quantity):
if not np.iterable(data): # expand dim for scalar input
Expand Down Expand Up @@ -385,6 +387,68 @@ def __repr__(self):
)
return representation + f"Units:\n\t{self.units}\n"

def _interp_time_discrete(
self, time: Union[pd.TimedeltaIndex, pint.Quantity]
) -> xr.DataArray:
"""Interpolate the time series if its data is composed of discrete values.

See `interp_time` for interface description.

"""
if isinstance(self._data, xr.DataArray):
if isinstance(time, pint.Quantity):
time = ut.to_pandas_time_index(time)
if not isinstance(time, pd.TimedeltaIndex):
raise ValueError(
'"time" must be a time quantity or a "pandas.TimedeltaIndex".'
)
# constant values are also treated by this branch
if self._data.attrs["interpolation"] == "linear" or self.shape[0] == 1:
return ut.xr_interp_like(
self._data,
{"time": time},
assume_sorted=False,
broadcast_missing=False,
)

dax = self._data.reindex({"time": time}, method="ffill")
return dax.fillna(self._data[0])
CagtayFabry marked this conversation as resolved.
Show resolved Hide resolved

def _interp_time_expression(
self, time: Union[pd.TimedeltaIndex, pint.Quantity], time_unit: str
) -> xr.DataArray:
"""Interpolate the time series if its data is a mathematical expression.

See `interp_time` for interface description.

"""
# Transform time to both formats
if isinstance(time, pint.Quantity) and time.check(UREG.get_dimensionality("s")):
time_q = time
time_pd = ut.to_pandas_time_index(time)
elif isinstance(time, pd.TimedeltaIndex):
time_q = ut.pandas_time_delta_to_quantity(time, time_unit)
time_pd = time
else:
raise ValueError(
'"time" must be a time quantity or a "pandas.TimedeltaIndex".'
)

if len(self.shape) > 1 and np.iterable(time_q):
while len(time_q.shape) < len(self.shape):
time_q = time_q[:, np.newaxis]

# evaluate expression
data = self._data.evaluate(**{self._time_var_name: time_q})
data = data.astype(float).to_reduced_units() # float conversion before reduce!

# create data array
if not np.iterable(data): # make sure quantity is not scalar value
data = np.expand_dims(data, 0)

dax = xr.DataArray(data=data) # don't know exact dimensions so far
return dax.rename({"dim_0": "time"}).assign_coords({"time": time_pd})

@property
def data(self) -> Union[pint.Quantity, MathematicalExpression]:
"""Return the data of the TimeSeries.
Expand Down Expand Up @@ -449,7 +513,7 @@ def time(self) -> Union[None, pd.TimedeltaIndex]:

def interp_time(
self, time: Union[pd.TimedeltaIndex, pint.Quantity], time_unit: str = "s"
) -> xr.DataArray:
) -> "TimeSeries":
"""Interpolate the TimeSeries in time.

If the internal data consists of discrete values, an interpolation with the
Expand All @@ -470,55 +534,28 @@ def interp_time(

Returns
-------
xarray.DataArray:
TimeSeries :
A data array containing the interpolated data.
vhirtham marked this conversation as resolved.
Show resolved Hide resolved

"""
if isinstance(self._data, xr.DataArray):
if isinstance(time, pint.Quantity):
time = ut.to_pandas_time_index(time)
if not isinstance(time, pd.TimedeltaIndex):
raise ValueError(
'"time" must be a time quantity or a "pandas.TimedeltaIndex".'
)
# constant values are also treated by this branch
if self._data.attrs["interpolation"] == "linear" or self.shape[0] == 1:
return ut.xr_interp_like(
self._data,
{"time": time},
assume_sorted=False,
broadcast_missing=False,
)

dax = self._data.reindex({"time": time}, method="ffill")
return dax.fillna(self._data[0])

# Transform time to both formats
if isinstance(time, pint.Quantity) and time.check(UREG.get_dimensionality("s")):
time_q = time
time_pd = ut.to_pandas_time_index(time)
elif isinstance(time, pd.TimedeltaIndex):
time_q = ut.pandas_time_delta_to_quantity(time, time_unit)
time_pd = time
else:
raise ValueError(
'"time" must be a time quantity or a "pandas.TimedeltaIndex".'
if self._interp_counter > 0:
warn(
"The data of the time series has already been interpolated "
f"{self._interp_counter} time(s)."
)

if len(self.shape) > 1 and np.iterable(time_q):
while len(time_q.shape) < len(self.shape):
time_q = time_q[:, np.newaxis]

# evaluate expression
data = self._data.evaluate(**{self._time_var_name: time_q})
data = data.astype(float).to_reduced_units() # float conversion before reduce!
if isinstance(self._data, xr.DataArray):
dax = self._interp_time_discrete(time)
else:
dax = self._interp_time_expression(time, time_unit)

# create data array
if not np.iterable(data): # make sure quantity is not scalar value
data = np.expand_dims(data, 0)
interpolation = self.interpolation
if interpolation is None:
interpolation = "linear"
CagtayFabry marked this conversation as resolved.
Show resolved Hide resolved

dax = xr.DataArray(data=data) # don't know exact dimensions so far
return dax.rename({"dim_0": "time"}).assign_coords({"time": time_pd})
ts = TimeSeries(data=dax.data, time=time, interpolation=interpolation)
ts._interp_counter = self._interp_counter + 1
return ts

@property
def shape(self) -> Tuple:
Expand Down
1 change: 1 addition & 0 deletions weldx/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""pytest configuration."""
import pytest

from weldx.asdf.cli.welding_schema import single_pass_weld_example


Expand Down
21 changes: 18 additions & 3 deletions weldx/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,27 @@ def test_interp_time(ts, time, magnitude_exp, unit_exp):
result = ts.interp_time(time)

assert np.all(np.isclose(result.data.magnitude, magnitude_exp))
assert Q_(1, str(result.data.units)) == Q_(1, unit_exp)
assert Q_(1, str(result.units)) == Q_(1, unit_exp)

print(result.time)

if isinstance(time, pint.Quantity):
assert np.all(result.time == ut.to_pandas_time_index(time))
assert np.all(result.data_array.time == ut.to_pandas_time_index(time))
else:
assert np.all(result.time == time)
assert np.all(result.data_array.time == time)

# test_interp_time_warning ---------------------------------------------------------

@staticmethod
def test_interp_time_warning():
"""Test if a warning is emitted when interpolating already interpolated data."""
ts = TimeSeries(data=Q_([1, 2, 3], "m"), time=Q_([0, 1, 2], "s"))
with pytest.warns(None) as recorded_warnings:
ts_interp = ts.interp_time(Q_([0.25, 0.5, 0.75, 1], "s"))
assert len(recorded_warnings) == 0

with pytest.warns(UserWarning):
ts_interp.interp_time(Q_([0.4, 0.6], "s"))

# test_interp_time_exceptions ------------------------------------------------------

Expand Down