From 3229168852a6320d898ae1dea572f155b16fa2cf Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Wed, 1 Mar 2023 19:56:04 -0800 Subject: [PATCH 1/2] fix time range for umap --- src/phoenix/metrics/timeseries.py | 13 ++++-- src/phoenix/pointcloud/pointcloud.py | 2 + .../server/api/types/EmbeddingDimension.py | 46 +++++++++++-------- 3 files changed, 39 insertions(+), 22 deletions(-) diff --git a/src/phoenix/metrics/timeseries.py b/src/phoenix/metrics/timeseries.py index 988a86403e..a82c8524af 100644 --- a/src/phoenix/metrics/timeseries.py +++ b/src/phoenix/metrics/timeseries.py @@ -4,6 +4,7 @@ from typing import Any, Callable, Generator, Iterable, List, Tuple, Union, cast import pandas as pd +from typing_extensions import TypeAlias from . import Metric @@ -39,11 +40,17 @@ def _calculate(df: pd.DataFrame, calcs: Iterable[Metric]) -> "pd.Series[Any]": return pd.Series(dict(calc(df) for calc in calcs)) -def _time_slice_from_sorted_index(idx: pd.Index, start: datetime, end: datetime) -> slice: +Start: TypeAlias = int +Stop: TypeAlias = int + + +def row_interval_from_sorted_time_index( + idx: pd.Index, start: datetime, end: datetime +) -> Tuple[Start, Stop]: """ Returns end exclusive time slice from sorted index. """ - return slice(*cast(List[int], idx.searchsorted((start, end)))) + return cast(Tuple[Start, Stop], idx.searchsorted((start, end))) def _aggregator( @@ -71,7 +78,7 @@ def _aggregator( (pd.DataFrame(),), ( dataframe.iloc[ - _time_slice_from_sorted_index(dataframe.index, start, end), + slice(*row_interval_from_sorted_time_index(dataframe.index, start, end)), columns, ] .groupby(group, group_keys=True) diff --git a/src/phoenix/pointcloud/pointcloud.py b/src/phoenix/pointcloud/pointcloud.py index 982f4af2d9..ea60662b62 100644 --- a/src/phoenix/pointcloud/pointcloud.py +++ b/src/phoenix/pointcloud/pointcloud.py @@ -57,6 +57,8 @@ def generate( some vectors may not belong to any cluster and are excluded here. """ + if not data: + return {}, {} identifiers, vectors = zip(*data.items()) projections = self.dimensionalityReducer.project( np.stack(vectors), n_components=n_components diff --git a/src/phoenix/server/api/types/EmbeddingDimension.py b/src/phoenix/server/api/types/EmbeddingDimension.py index c4c85eea47..e68e3925bf 100644 --- a/src/phoenix/server/api/types/EmbeddingDimension.py +++ b/src/phoenix/server/api/types/EmbeddingDimension.py @@ -1,6 +1,6 @@ from collections import defaultdict from datetime import datetime, timedelta -from itertools import chain, repeat, starmap +from itertools import chain from typing import Any, List, Mapping, Optional import numpy as np @@ -17,6 +17,7 @@ from phoenix.datasets.errors import SchemaError from phoenix.datasets.event import EventId from phoenix.metrics.embeddings import euclidean_distance +from phoenix.metrics.timeseries import row_interval_from_sorted_time_index from phoenix.pointcloud.clustering import Hdbscan from phoenix.pointcloud.pointcloud import PointCloud from phoenix.pointcloud.projectors import Umap @@ -194,23 +195,32 @@ def UMAPPoints( ) -> UMAPPoints: n_samples = n_samples or DEFAULT_N_SAMPLES - # TODO validate time_range. - - primary_dataset = info.context.model.primary_dataset - reference_dataset = info.context.model.reference_dataset - - primary_data = zip( - starmap(EventId, zip(range(n_samples), repeat(DatasetType.PRIMARY))), - primary_dataset.get_embedding_vector_column(self.name).to_numpy()[:n_samples], - ) - if reference_dataset: - reference_data = zip( - starmap(EventId, zip(range(n_samples), repeat(DatasetType.REFERENCE))), - reference_dataset.get_embedding_vector_column(self.name).to_numpy()[:n_samples], + datasets = { + DatasetType.PRIMARY: info.context.model.primary_dataset, + DatasetType.REFERENCE: info.context.model.reference_dataset, + } + + # TODO: actual random sampling with seed + data = dict( + chain.from_iterable( + ( + () + if dataset is None + else ( + ( + EventId(row_id, dataset_id), + dataset.get_embedding_vector_column(self.name).iloc[row_id], + ) + for row_id in range( + *row_interval_from_sorted_time_index( + dataset.dataframe.index, start=time_range.start, end=time_range.end + ) + )[:n_samples] + ) + ) + for dataset_id, dataset in datasets.items() ) - data = dict(chain(primary_data, reference_data)) - else: - data = dict(primary_data) + ) # validate n_components to be 2 or 3 n_components = DEFAULT_N_COMPONENTS if n_components is None else n_components @@ -225,8 +235,6 @@ def UMAPPoints( clustersFinder=Hdbscan(), ).generate(data, n_components=n_components) - datasets = {DatasetType.PRIMARY: primary_dataset, DatasetType.REFERENCE: reference_dataset} - points = defaultdict(list) for event_id, vector in vectors.items(): From b6e16ae9b17db32124cbda5aac881e56fdc3b156 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Wed, 1 Mar 2023 20:59:14 -0800 Subject: [PATCH 2/2] clean up --- src/phoenix/metrics/timeseries.py | 8 ++++---- src/phoenix/server/api/types/EmbeddingDimension.py | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/phoenix/metrics/timeseries.py b/src/phoenix/metrics/timeseries.py index a82c8524af..6a4c0b8516 100644 --- a/src/phoenix/metrics/timeseries.py +++ b/src/phoenix/metrics/timeseries.py @@ -40,17 +40,17 @@ def _calculate(df: pd.DataFrame, calcs: Iterable[Metric]) -> "pd.Series[Any]": return pd.Series(dict(calc(df) for calc in calcs)) -Start: TypeAlias = int -Stop: TypeAlias = int +StartIndex: TypeAlias = int +StopIndex: TypeAlias = int def row_interval_from_sorted_time_index( idx: pd.Index, start: datetime, end: datetime -) -> Tuple[Start, Stop]: +) -> Tuple[StartIndex, StopIndex]: """ Returns end exclusive time slice from sorted index. """ - return cast(Tuple[Start, Stop], idx.searchsorted((start, end))) + return cast(Tuple[StartIndex, StopIndex], idx.searchsorted((start, end))) def _aggregator( diff --git a/src/phoenix/server/api/types/EmbeddingDimension.py b/src/phoenix/server/api/types/EmbeddingDimension.py index e68e3925bf..1ca4c5c3df 100644 --- a/src/phoenix/server/api/types/EmbeddingDimension.py +++ b/src/phoenix/server/api/types/EmbeddingDimension.py @@ -200,7 +200,6 @@ def UMAPPoints( DatasetType.REFERENCE: info.context.model.reference_dataset, } - # TODO: actual random sampling with seed data = dict( chain.from_iterable( (