From 1d018a57c2c056789441d1e7bed5300d2c46138a Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Wed, 22 Mar 2023 14:53:32 -0700 Subject: [PATCH 1/3] improve readability --- src/phoenix/metrics/timeseries.py | 153 ++++++++++++------ .../server/api/types/EmbeddingDimension.py | 6 +- 2 files changed, 105 insertions(+), 54 deletions(-) diff --git a/src/phoenix/metrics/timeseries.py b/src/phoenix/metrics/timeseries.py index db6d177068..98a34412ea 100644 --- a/src/phoenix/metrics/timeseries.py +++ b/src/phoenix/metrics/timeseries.py @@ -1,7 +1,7 @@ from datetime import datetime, timedelta from functools import partial -from itertools import accumulate, chain, repeat, takewhile -from typing import Any, Callable, Iterable, Iterator, List, Tuple, cast +from itertools import accumulate, repeat +from typing import Any, Callable, Iterable, Iterator, Tuple, cast import pandas as pd from typing_extensions import TypeAlias @@ -33,7 +33,10 @@ def timeseries( ) -def _calculate(df: pd.DataFrame, calcs: Iterable[Metric]) -> "pd.Series[Any]": +def _calculate( + df: pd.DataFrame, + calcs: Iterable[Metric], +) -> "pd.Series[Any]": """ Calculates each metric on the dataframe. """ @@ -45,12 +48,17 @@ def _calculate(df: pd.DataFrame, calcs: Iterable[Metric]) -> "pd.Series[Any]": def row_interval_from_sorted_time_index( - idx: pd.Index, start: datetime, end: datetime + time_index: pd.Index, + start_time: datetime, + end_time: datetime, ) -> Tuple[StartIndex, StopIndex]: """ Returns end exclusive time slice from sorted index. """ - return cast(Tuple[StartIndex, StopIndex], idx.searchsorted((start, end))) + return cast( + Tuple[StartIndex, StopIndex], + time_index.searchsorted((start_time, end_time)), + ) def _aggregator( @@ -65,66 +73,59 @@ def _aggregator( """ Calls groupby on the dataframe and apply metric calculations on each group. """ - calcs: Tuple[Metric, ...] = tuple(metrics) - input_column_indices: List[int] = sorted( - { - dataframe.columns.get_loc(column_name) - for calc in calcs - for column_name in calc.input_column_names() - } - ) + calcs = tuple(metrics) + unique_input_column_indices = set() + for calc in calcs: + for column_name in calc.input_column_names(): + column_index = dataframe.columns.get_loc(column_name) + unique_input_column_indices.add(column_index) + input_column_indices = sorted(unique_input_column_indices) + # need at least one column in the dataframe, so take the first one + # if input_column_indices is empty + if len(input_column_indices) == 0: + input_column_indices = [0] + dataframe = dataframe.iloc[:, input_column_indices] return pd.concat( - chain( - (pd.DataFrame(),), - ( - dataframe.iloc[ - slice(*row_interval_from_sorted_time_index(dataframe.index, start, end)), - input_column_indices or [0], # need at least one, so take the first one - ] - .groupby(group, group_keys=True) - .apply(partial(_calculate, calcs=calcs)) - .loc[start_time:end_time, :] # type: ignore # slice has no overload for datetime - for start, end, group in _groupers( - start_time=start_time, - end_time=end_time, - evaluation_window=evaluation_window, - sampling_interval=sampling_interval, - ) - ), + _results( + calcs=calcs, + dataframe=dataframe, + start_time=start_time, + end_time=end_time, + evaluation_window=evaluation_window, + sampling_interval=sampling_interval, ), verify_integrity=True, ) +StartTime: TypeAlias = datetime +EndTime: TypeAlias = datetime + + def _groupers( start_time: datetime, end_time: datetime, evaluation_window: timedelta, sampling_interval: timedelta, -) -> Iterator[Tuple[datetime, datetime, pd.Grouper]]: +) -> Iterator[Tuple[StartTime, EndTime, pd.Grouper]]: """ Yields pandas.Groupers from time series parameters. """ if not sampling_interval: return + total_time_span = end_time - start_time divisible = evaluation_window % sampling_interval == timedelta() - max_offset = end_time - start_time - if divisible and evaluation_window < max_offset: + if divisible and evaluation_window < total_time_span: max_offset = evaluation_window - yield from ( - ( - (start_time if divisible else end_time - offset) - evaluation_window, - end_time - offset, - pd.Grouper( # type: ignore # mypy finds the wrong Grouper - freq=evaluation_window, - origin=end_time, - offset=-offset, - # Each point in timeseries will be labeled by the end instant of - # its evaluation window. - label="right", - sort=False, - ), - ) + else: + max_offset = total_time_span + offsets = accumulate( + repeat(sampling_interval), + initial=timedelta(), + ) + for offset in offsets: + if offset >= max_offset: + return # Each Grouper is like a row in a brick wall, where each brick is an # evaluation window. By shifting each row of bricks by the sampling # interval, we can get all the brick's right edges to line up with the @@ -140,8 +141,58 @@ def _groupers( # ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐ combine into # └─┴─┴─┴─┴─┴─┴─┴─┴─┴2┴1┘0 final time series # - for offset in takewhile( - lambda offset: offset < max_offset, - accumulate(repeat(sampling_interval), initial=timedelta()), + grouper = pd.Grouper( # type: ignore # mypy finds the wrong Grouper + freq=evaluation_window, + origin=end_time, + offset=-offset, + # Each point in timeseries will be labeled by the end instant of + # its evaluation window. + label="right", + sort=False, ) - ) + time_filter_end = end_time - offset + if divisible: + time_filter_start = start_time - evaluation_window + else: + time_filter_start = time_filter_end - evaluation_window + yield ( + time_filter_start, + time_filter_end, + grouper, + ) + + +def _results( + calcs: Iterable[Metric], + dataframe: pd.DataFrame, + start_time: datetime, + end_time: datetime, + evaluation_window: timedelta, + sampling_interval: timedelta, +) -> Iterator[pd.DataFrame]: + yield pd.DataFrame() + calculate_metrics = partial(_calculate, calcs=calcs) + result_slice = slice(start_time, end_time) + for ( + time_filter_start, + time_filter_end, + group, + ) in _groupers( + start_time=start_time, + end_time=end_time, + evaluation_window=evaluation_window, + sampling_interval=sampling_interval, + ): + row_start, row_end = row_interval_from_sorted_time_index( + time_index=dataframe.index, + start_time=time_filter_start, + end_time=time_filter_end, + ) + row_slice = slice(row_start, row_end) + filtered = dataframe.iloc[row_slice, :] + yield filtered.groupby( + group, + group_keys=True, + ).apply( + calculate_metrics, + ).loc[result_slice, :] diff --git a/src/phoenix/server/api/types/EmbeddingDimension.py b/src/phoenix/server/api/types/EmbeddingDimension.py index 29114c5290..92fd1010b9 100644 --- a/src/phoenix/server/api/types/EmbeddingDimension.py +++ b/src/phoenix/server/api/types/EmbeddingDimension.py @@ -191,9 +191,9 @@ def UMAPPoints( row_id_start, row_id_stop = 0, len(dataframe) if dataset_id == DatasetType.PRIMARY: row_id_start, row_id_stop = row_interval_from_sorted_time_index( - dataframe.index, - start=time_range.start, - end=time_range.end, + time_index=dataframe.index, + start_time=time_range.start, + end_time=time_range.end, ) vector_column = dataset.get_embedding_vector_column(self.name) for row_id in range(row_id_start, row_id_stop)[:n_samples]: From 5ae3a84a12fadbfae9f4eace062b672d428e4b77 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Wed, 22 Mar 2023 15:00:48 -0700 Subject: [PATCH 2/3] add docstring --- src/phoenix/metrics/timeseries.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/phoenix/metrics/timeseries.py b/src/phoenix/metrics/timeseries.py index 98a34412ea..2cf3561793 100644 --- a/src/phoenix/metrics/timeseries.py +++ b/src/phoenix/metrics/timeseries.py @@ -170,6 +170,9 @@ def _results( evaluation_window: timedelta, sampling_interval: timedelta, ) -> Iterator[pd.DataFrame]: + """ + Yields metric results for each data point in time series. + """ yield pd.DataFrame() calculate_metrics = partial(_calculate, calcs=calcs) result_slice = slice(start_time, end_time) From fb2d3cbe92cf37289b4bb34ef828d3408316c316 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Wed, 22 Mar 2023 21:48:07 -0700 Subject: [PATCH 3/3] clean up --- src/phoenix/metrics/timeseries.py | 30 ++++++++++--------- .../server/api/types/EmbeddingDimension.py | 4 +-- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/phoenix/metrics/timeseries.py b/src/phoenix/metrics/timeseries.py index 2cf3561793..75e67b8333 100644 --- a/src/phoenix/metrics/timeseries.py +++ b/src/phoenix/metrics/timeseries.py @@ -49,15 +49,15 @@ def _calculate( def row_interval_from_sorted_time_index( time_index: pd.Index, - start_time: datetime, - end_time: datetime, + time_start: datetime, + time_stop: datetime, ) -> Tuple[StartIndex, StopIndex]: """ Returns end exclusive time slice from sorted index. """ return cast( Tuple[StartIndex, StopIndex], - time_index.searchsorted((start_time, end_time)), + time_index.searchsorted((time_start, time_stop)), ) @@ -150,14 +150,14 @@ def _groupers( label="right", sort=False, ) - time_filter_end = end_time - offset + time_stop = end_time - offset if divisible: - time_filter_start = start_time - evaluation_window + time_start = start_time - evaluation_window else: - time_filter_start = time_filter_end - evaluation_window + time_start = time_stop - evaluation_window yield ( - time_filter_start, - time_filter_end, + time_start, + time_stop, grouper, ) @@ -175,10 +175,11 @@ def _results( """ yield pd.DataFrame() calculate_metrics = partial(_calculate, calcs=calcs) + # pandas time indexing is end-inclusive result_slice = slice(start_time, end_time) for ( - time_filter_start, - time_filter_end, + time_start, # inclusive + time_stop, # exclusive group, ) in _groupers( start_time=start_time, @@ -186,12 +187,13 @@ def _results( evaluation_window=evaluation_window, sampling_interval=sampling_interval, ): - row_start, row_end = row_interval_from_sorted_time_index( + row_start, row_stop = row_interval_from_sorted_time_index( time_index=dataframe.index, - start_time=time_filter_start, - end_time=time_filter_end, + time_start=time_start, # inclusive + time_stop=time_stop, # exclusive ) - row_slice = slice(row_start, row_end) + # pandas row indexing is stop-exclusive + row_slice = slice(row_start, row_stop) filtered = dataframe.iloc[row_slice, :] yield filtered.groupby( group, diff --git a/src/phoenix/server/api/types/EmbeddingDimension.py b/src/phoenix/server/api/types/EmbeddingDimension.py index 92fd1010b9..75a7c5abe0 100644 --- a/src/phoenix/server/api/types/EmbeddingDimension.py +++ b/src/phoenix/server/api/types/EmbeddingDimension.py @@ -192,8 +192,8 @@ def UMAPPoints( if dataset_id == DatasetType.PRIMARY: row_id_start, row_id_stop = row_interval_from_sorted_time_index( time_index=dataframe.index, - start_time=time_range.start, - end_time=time_range.end, + time_start=time_range.start, + time_stop=time_range.end, ) vector_column = dataset.get_embedding_vector_column(self.name) for row_id in range(row_id_start, row_id_stop)[:n_samples]: