Skip to content

Commit

Permalink
feat: Add default ability to optimize the timeseries
Browse files Browse the repository at this point in the history
  • Loading branch information
flxdot committed May 8, 2024
1 parent 6d7b1b4 commit f0b7776
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 1 deletion.
20 changes: 19 additions & 1 deletion services/api/carlos/api/routes/data_routes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
__all__ = ["data_router"]


from carlos.database.context import RequestContext
from carlos.database.data.timeseries import (
MAX_QUERY_RANGE,
Expand All @@ -12,6 +13,7 @@

from carlos.api.depends.context import request_context
from carlos.api.params.query import datetime_range
from carlos.api.utils.data_reduction import optimize_timeseries

data_router = APIRouter()

Expand All @@ -33,11 +35,27 @@ async def get_timeseries_route(
alias="timeseriesId",
description="One ore more timeseries identifiers to get data for.",
),
reduce_samples: bool = Query(
True,
alias="reduceSamples",
description="Activated by default. This function will try to reduce the "
"number of samples returned by removing consecutive samples that change less "
"than 0.5% as they are not visible in the UI any how.",
),
dt_range: DatetimeRange = Depends(datetime_range),
context: RequestContext = Depends(request_context),
):
"""Returns the timeseries data for the given timeseries identifiers."""

return await get_timeseries(
timeseries = await get_timeseries(
context=context, timeseries_ids=timeseries_id, datetime_range=dt_range
)

sample_reduce_threshold = 0.005 if reduce_samples else 0.0

return [
optimize_timeseries(
timeseries=ts, sample_reduce_threshold=sample_reduce_threshold
)
for ts in timeseries
]
Empty file.
80 changes: 80 additions & 0 deletions services/api/carlos/api/utils/data_reduction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
__all__ = ["optimize_timeseries"]
from datetime import timedelta

from carlos.database.data.timeseries import TimeseriesData

DEFAULT_SPLIT_THRESHOLD = timedelta(minutes=15)


def optimize_timeseries(
timeseries: TimeseriesData,
sample_reduce_threshold: float = 0.01,
split_threshold: timedelta = DEFAULT_SPLIT_THRESHOLD,
) -> TimeseriesData:
"""This function optimizes time series for the display in the frontend by,
injecting None values into data gaps larger than the split_threshold.
It further more allows to remove consecutive samples if the absolute relative
change to last sample is less than sample_reduce_threshold.
"""

if len(timeseries.values) < 2:
return timeseries

sample_reduce_threshold = abs(sample_reduce_threshold)
split_threshold = abs(split_threshold)

# we always need the first value
ts_len = len(timeseries.values)
timestamps = [timeseries.timestamps[0]]
values = [timeseries.values[0]]
prev_ts = timeseries.timestamps[0]
for idx, (ts, val) in enumerate(
zip(timeseries.timestamps[1:], timeseries.values[1:])
):
delta = ts - prev_ts
if delta > split_threshold:
timestamps.append(prev_ts + delta / 2)
values.append(None)

if (
is_value_changed(
value=val, previous_value=values[-1], threshold=sample_reduce_threshold
)
or idx + 1 == ts_len
):
timestamps.append(ts)
values.append(val)

prev_ts = ts

return TimeseriesData(
timeseries_id=timeseries.timeseries_id, timestamps=timestamps, values=values
)


def is_value_changed(
value: float | None, previous_value: float | None, threshold: float
) -> bool:
"""Returns true if the value changed the datatype or the value changed more
than the threshold (in percent).
"""

# A 0 threshold means that we need to consider the value always to be changed
if threshold <= 0.0:
return True

# if the data type changed, the value changed for sure
if not isinstance(value, type(previous_value)):
return True

# in case any of the type are none, we just compare the values
if value is None or previous_value is None:
return value != previous_value

if value == previous_value:
return False

if value == 0.0 or previous_value == 0.0:
return True

return abs(previous_value - value) > threshold * previous_value
115 changes: 115 additions & 0 deletions services/api/carlos/api/utils/data_reduction_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from datetime import UTC, datetime, timedelta

import pytest
from carlos.database.data.timeseries import TimeseriesData

from carlos.api.utils.data_reduction import (
DEFAULT_SPLIT_THRESHOLD,
is_value_changed,
optimize_timeseries,
)


@pytest.mark.parametrize(
"value, previous_value, threshold, expected",
[
pytest.param(None, None, 0.5, False, id="both None"),
pytest.param(None, 1.0, 0.5, True, id="value None"),
pytest.param(1.0, None, 0.5, True, id="previous None"),
pytest.param(1.0, 1.0, 0.0, True, id="no threshold means always changed"),
pytest.param(1.02, 1.0, 0.01, True, id="value >1% change"),
pytest.param(1.0, 1.02, 0.01, True, id="previous >1% change"),
pytest.param(0.02, 0.0, 0.01, True, id="zero - value >1% change"),
pytest.param(0.0, 0.02, 0.01, True, id="zero - previous >1% change"),
pytest.param(1.0, 1.0, 0.01, False, id="values equal"),
pytest.param(0.0, 0.0, 0.01, False, id="zero - value equal"),
pytest.param(1.009, 1.0, 0.01, False, id="value <1% change"),
pytest.param(1.0, 1.009, 0.01, False, id="previous <1% change"),
pytest.param(0.009, 0.0, 0.01, True, id="zero - value true"),
pytest.param(0.0, 0.009, 0.01, True, id="zero - previous true"),
],
)
def test_is_value_changed(
value: float | None, previous_value: float | None, threshold: float, expected: bool
):
"""This test ensures that we covered all edge cases to detect changes."""

assert (
is_value_changed(
value=value, previous_value=previous_value, threshold=threshold
)
is expected
)


def ts(offset: int) -> datetime:
"""Little helper to reduce the boilerplate to generate test datetimes."""
return datetime(2024, 1, 1, tzinfo=UTC) + timedelta(seconds=offset)


@pytest.mark.parametrize(
"ts_in, split_threshold, ts_expected",
[
pytest.param(
TimeseriesData(
timeseries_id=42,
timestamps=[],
values=[],
),
DEFAULT_SPLIT_THRESHOLD,
TimeseriesData(
timeseries_id=42,
timestamps=[],
values=[],
),
id="empty timeseries",
),
pytest.param(
TimeseriesData(
timeseries_id=42,
timestamps=[ts(0), ts(60), ts(120)],
values=[1, 1, 2],
),
DEFAULT_SPLIT_THRESHOLD,
TimeseriesData(
timeseries_id=42,
timestamps=[ts(0), ts(120)],
values=[1, 2],
),
id="duplicate data at beginning",
),
pytest.param(
TimeseriesData(
timeseries_id=42,
timestamps=[ts(i) for i in range(10)],
values=[1, 2, 2, 2, 2, 2, 2, 3, 3, 4],
),
DEFAULT_SPLIT_THRESHOLD,
TimeseriesData(
timeseries_id=42,
timestamps=[ts(0), ts(1), ts(7), ts(9)],
values=[1, 2, 3, 4],
),
id="duplicate in the middle beginning",
),
pytest.param(
TimeseriesData(
timeseries_id=42,
timestamps=[ts(0), ts(120)],
values=[1, 2],
),
timedelta(seconds=30),
TimeseriesData(
timeseries_id=42,
timestamps=[ts(0), ts(60), ts(120)],
values=[1, None, 2],
),
id="None inserted in middle of gap",
),
],
)
def test_optimize_timeseries(
ts_in: TimeseriesData, ts_expected: TimeseriesData, split_threshold: timedelta
):

assert optimize_timeseries(ts_in, split_threshold=split_threshold) == ts_expected

0 comments on commit f0b7776

Please sign in to comment.