From b9ecbbc86ba0b71c22d51a0e1cb563c1e3809842 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Thu, 12 Jan 2023 17:23:42 -0800 Subject: [PATCH 01/27] wip --- src/phoenix/datasets/dataset.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index 9eed1149e1..e99db94303 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -3,8 +3,9 @@ import sys import uuid from typing import Any, Literal, Optional, Union +import dataclasses -from pandas import DataFrame, Series, read_parquet +from pandas import DataFrame, Series, read_parquet, Timestamp from phoenix.config import dataset_dir from phoenix.utils import FilePath @@ -190,8 +191,19 @@ def from_name(cls, name: str) -> "Dataset": @staticmethod def _parse_dataframe(dataframe: DataFrame, schema: Schema) -> DataFrame: + cols_to_add = dict() + if schema.timestamp_column_name is None: + schema = dataclasses.replace(schema, timestamp_column_name="timestamp") + cols_to_add['timestamp'] = lambda x: Timestamp.now() + if schema.prediction_id_column_name is None: + schema = dataclasses.replace(schema, prediction_id_column_name="prediction_id") + cols_to_add['prediction_id'] = range(len(dataframe)) + + dataframe = dataframe.assign(**cols_to_add) + schema_cols = [ schema.timestamp_column_name, + schema.prediction_id_column_name, schema.prediction_label_column_name, schema.prediction_score_column_name, schema.actual_label_column_name, @@ -212,6 +224,7 @@ def _parse_dataframe(dataframe: DataFrame, schema: Schema) -> DataFrame: drop_cols = [col for col in dataframe.columns if col not in schema_cols] return dataframe.drop(columns=drop_cols) + def to_disc(self) -> None: """writes the data and schema to disc""" From 4cce6c2660eb9398e01f1e454abb6e940174a9f5 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Thu, 12 Jan 2023 17:30:13 -0800 Subject: [PATCH 02/27] Update dataset.py --- src/phoenix/datasets/dataset.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index e99db94303..ee41b9f74a 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -1,11 +1,11 @@ +import dataclasses import logging import os import sys import uuid from typing import Any, Literal, Optional, Union -import dataclasses -from pandas import DataFrame, Series, read_parquet, Timestamp +from pandas import DataFrame, Series, Timestamp, read_parquet from phoenix.config import dataset_dir from phoenix.utils import FilePath @@ -194,10 +194,10 @@ def _parse_dataframe(dataframe: DataFrame, schema: Schema) -> DataFrame: cols_to_add = dict() if schema.timestamp_column_name is None: schema = dataclasses.replace(schema, timestamp_column_name="timestamp") - cols_to_add['timestamp'] = lambda x: Timestamp.now() + cols_to_add["timestamp"] = lambda x: Timestamp.now() if schema.prediction_id_column_name is None: schema = dataclasses.replace(schema, prediction_id_column_name="prediction_id") - cols_to_add['prediction_id'] = range(len(dataframe)) + cols_to_add["prediction_id"] = range(len(dataframe)) dataframe = dataframe.assign(**cols_to_add) @@ -224,7 +224,6 @@ def _parse_dataframe(dataframe: DataFrame, schema: Schema) -> DataFrame: drop_cols = [col for col in dataframe.columns if col not in schema_cols] return dataframe.drop(columns=drop_cols) - def to_disc(self) -> None: """writes the data and schema to disc""" From 7cab30381f452385d51ded02390890a944ebfaf3 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Thu, 12 Jan 2023 17:50:41 -0800 Subject: [PATCH 03/27] Update dataset.py --- src/phoenix/datasets/dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index ee41b9f74a..761cbcf7cc 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -194,10 +194,10 @@ def _parse_dataframe(dataframe: DataFrame, schema: Schema) -> DataFrame: cols_to_add = dict() if schema.timestamp_column_name is None: schema = dataclasses.replace(schema, timestamp_column_name="timestamp") - cols_to_add["timestamp"] = lambda x: Timestamp.now() + cols_to_add["timestamp"] = Timestamp.now if schema.prediction_id_column_name is None: schema = dataclasses.replace(schema, prediction_id_column_name="prediction_id") - cols_to_add["prediction_id"] = range(len(dataframe)) + cols_to_add["prediction_id"] = lambda: str(uuid.uuid4()) dataframe = dataframe.assign(**cols_to_add) From 50be45ddf02ffe7becccec651a593dfc25a2cc94 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Fri, 13 Jan 2023 04:31:50 -0800 Subject: [PATCH 04/27] wip --- src/phoenix/datasets/dataset.py | 12 ++++++----- tests/datasets/test_dataset.py | 35 +++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index 761cbcf7cc..45daaeac2e 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -3,7 +3,7 @@ import os import sys import uuid -from typing import Any, Literal, Optional, Union +from typing import Any, Callable, Dict, Literal, Optional, Union from pandas import DataFrame, Series, Timestamp, read_parquet @@ -191,15 +191,17 @@ def from_name(cls, name: str) -> "Dataset": @staticmethod def _parse_dataframe(dataframe: DataFrame, schema: Schema) -> DataFrame: - cols_to_add = dict() + cols_to_add: Dict[str, Callable[[Any], Union[Timestamp, str]]] = dict() if schema.timestamp_column_name is None: + now = Timestamp.now() schema = dataclasses.replace(schema, timestamp_column_name="timestamp") - cols_to_add["timestamp"] = Timestamp.now + cols_to_add["timestamp"] = lambda _: now if schema.prediction_id_column_name is None: schema = dataclasses.replace(schema, prediction_id_column_name="prediction_id") - cols_to_add["prediction_id"] = lambda: str(uuid.uuid4()) + cols_to_add["prediction_id"] = lambda _: str(uuid.uuid4()) - dataframe = dataframe.assign(**cols_to_add) + if len(cols_to_add) > 0: + dataframe = dataframe.assign(**cols_to_add) schema_cols = [ schema.timestamp_column_name, diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index e8093b36a4..eb9e63307f 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -29,7 +29,11 @@ def include_embeddings(request): def expected_df(include_embeddings, random_seed): num_samples = 9 embedding_dimension = 15 + + ts = pd.Timestamp.now() data = { + "prediction_id": [n for n in range(num_samples)], + "timestamp": [ts for _ in range(num_samples)], "feature0": [random.random() for _ in range(num_samples)], "feature1": [random.random() for _ in range(num_samples)], "predicted_score": [random.random() for _ in range(num_samples)], @@ -60,6 +64,8 @@ def fastparquet_path(include_embeddings, expected_df, tmp_path): @pytest.fixture def schema(include_embeddings): kwargs = { + "prediction_id_column_name": "prediction_id", + "timestamp_column_name": "timestamp", "feature_column_names": ["feature0", "feature1"], "prediction_score_column_name": "predicted_score", } @@ -110,6 +116,8 @@ def test_dataset_from_parquet_correctly_load_data_with_and_without_embeddings( ): dataset_name = "dataset-name" dataset = initialization_class_method(filepath=filepath, schema=schema, name=dataset_name) + print(dataset.dataframe) + print(dataset.dataframe.dtypes) assert dataset.name == dataset_name for column_name in expected_df.columns: assert column_name in dataset.dataframe @@ -117,6 +125,8 @@ def test_dataset_from_parquet_correctly_load_data_with_and_without_embeddings( expected_column = expected_df[column_name] if column_name == "embeddings": assert_embedding_columns_almost_equal(actual_column, expected_column) + elif column_name == "timestamp": + pd.testing.assert_series_equal(actual_column, expected_column) else: assert_non_embedding_columns_almost_equal(actual_column, expected_column) @@ -144,3 +154,28 @@ def assert_embedding_columns_almost_equal(actual_embeddings_columns, expected_em actual_embeddings_columns, expected_embeddings_column ): assert_array_almost_equal(actual_embedding, expected_embedding) + + +def test_dataset_column_normalization(): + num_samples = 10 + data = { + "feature0": range(num_samples), + "feature1": range(num_samples), + "predicted_score": range(num_samples), + } + df = pd.DataFrame.from_dict(data) + kwargs = { + "feature_column_names": ["feature0", "feature1"], + "prediction_score_column_name": "predicted_score", + } + schema = Schema(**kwargs) + dataset = Dataset(dataframe=df, schema=schema) + + for column_name in df: + assert column_name in dataset.dataframe + actual_column = dataset.dataframe[column_name] + expected_column = df[column_name] + assert_non_embedding_columns_almost_equal(actual_column, expected_column) + + assert "prediction_id" in dataset.dataframe + assert "timestamp" in dataset.dataframe From efdf6c488a71714c38266ccdb8cb4a69123ab171 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Wed, 18 Jan 2023 05:52:28 -0800 Subject: [PATCH 05/27] updates --- src/phoenix/datasets/dataset.py | 12 +- src/phoenix/datasets/errors.py | 10 ++ src/phoenix/datasets/validation.py | 31 +++++- tests/datasets/test_dataset.py | 171 ++++++++++++++++++++++++----- 4 files changed, 195 insertions(+), 29 deletions(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index 45daaeac2e..8504e8a23b 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -5,7 +5,8 @@ import uuid from typing import Any, Callable, Dict, Literal, Optional, Union -from pandas import DataFrame, Series, Timestamp, read_parquet +from pandas import DataFrame, Series, Timestamp, read_parquet, to_datetime +from pandas.api.types import is_numeric_dtype from phoenix.config import dataset_dir from phoenix.utils import FilePath @@ -193,12 +194,19 @@ def from_name(cls, name: str) -> "Dataset": def _parse_dataframe(dataframe: DataFrame, schema: Schema) -> DataFrame: cols_to_add: Dict[str, Callable[[Any], Union[Timestamp, str]]] = dict() if schema.timestamp_column_name is None: - now = Timestamp.now() + now = Timestamp.utcnow() schema = dataclasses.replace(schema, timestamp_column_name="timestamp") cols_to_add["timestamp"] = lambda _: now + elif is_numeric_dtype(dataframe.dtypes[schema.timestamp_column_name]): + dataframe[schema.timestamp_column_name] = to_datetime( + dataframe[schema.timestamp_column_name], unit="ms" + ) + if schema.prediction_id_column_name is None: schema = dataclasses.replace(schema, prediction_id_column_name="prediction_id") cols_to_add["prediction_id"] = lambda _: str(uuid.uuid4()) + elif is_numeric_dtype(dataframe.dtypes[schema.prediction_id_column_name]): + dataframe["prediction_id"] = dataframe["prediction_id"].apply(str) if len(cols_to_add) > 0: dataframe = dataframe.assign(**cols_to_add) diff --git a/src/phoenix/datasets/errors.py b/src/phoenix/datasets/errors.py index 13c5b17be8..1c37d20390 100644 --- a/src/phoenix/datasets/errors.py +++ b/src/phoenix/datasets/errors.py @@ -49,6 +49,16 @@ def __init__(self, errors: Union[ValidationError, List[ValidationError]]): self.errors = errors +class InvalidColumnType(ValidationError): + """An error raised when the column type is invalid""" + + def __init__(self, error_msgs: Iterable[str]) -> None: + self.error_msgs = error_msgs + + def error_message(self) -> str: + return f"Invalid column types: {self.error_msgs}" + + class MissingField(ValidationError): """An error raised when trying to access a field that is absent from the Schema""" diff --git a/src/phoenix/datasets/validation.py b/src/phoenix/datasets/validation.py index b8013d9405..15afd4279f 100644 --- a/src/phoenix/datasets/validation.py +++ b/src/phoenix/datasets/validation.py @@ -2,16 +2,45 @@ from typing import List from pandas import DataFrame +from pandas.api.types import is_datetime64_any_dtype as is_datetime +from pandas.api.types import is_numeric_dtype, is_string_dtype from . import errors as err from .schema import Schema def validate_dataset_inputs(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: - general_checks = chain(check_missing_columns(dataframe, schema)) + general_checks = chain( + check_missing_columns(dataframe, schema), check_column_type(dataframe, schema) + ) return list(general_checks) +def check_column_type(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: + wrong_type_cols = [] + if schema.timestamp_column_name is not None: + if not ( + is_numeric_dtype(dataframe.dtypes[schema.timestamp_column_name]) + or is_datetime(dataframe.dtypes[schema.timestamp_column_name]) + ): + wrong_type_cols.append( + f"{schema.timestamp_column_name} should be of timestamp or numeric type" + ) + + if schema.prediction_id_column_name is not None: + if not ( + is_numeric_dtype(dataframe.dtypes[schema.prediction_id_column_name]) + or is_string_dtype(dataframe.dtypes[schema.prediction_id_column_name]) + ): + wrong_type_cols.append( + f"{schema.prediction_id_column_name} should be a string or numeric type" + ) + + if len(wrong_type_cols) > 0: + return [err.InvalidColumnType(wrong_type_cols)] + return [] + + def check_missing_columns(dataframe: DataFrame, schema: Schema) -> List[err.MissingColumns]: # converting to a set first makes the checks run a lot faster existing_columns = set(dataframe.columns) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index eb9e63307f..30af77bed3 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -3,6 +3,7 @@ """ import random +import uuid from functools import partial import numpy as np @@ -12,6 +13,9 @@ from pytest_lazyfixture import lazy_fixture from phoenix.datasets.dataset import Dataset, EmbeddingColumnNames, Schema +from phoenix.datasets.errors import DatasetError + +num_samples = 9 @pytest.fixture @@ -27,12 +31,11 @@ def include_embeddings(request): @pytest.fixture def expected_df(include_embeddings, random_seed): - num_samples = 9 embedding_dimension = 15 ts = pd.Timestamp.now() data = { - "prediction_id": [n for n in range(num_samples)], + "prediction_id": [str(n) for n in range(num_samples)], "timestamp": [ts for _ in range(num_samples)], "feature0": [random.random() for _ in range(num_samples)], "feature1": [random.random() for _ in range(num_samples)], @@ -123,12 +126,18 @@ def test_dataset_from_parquet_correctly_load_data_with_and_without_embeddings( assert column_name in dataset.dataframe actual_column = dataset.dataframe[column_name] expected_column = expected_df[column_name] - if column_name == "embeddings": - assert_embedding_columns_almost_equal(actual_column, expected_column) - elif column_name == "timestamp": - pd.testing.assert_series_equal(actual_column, expected_column) - else: - assert_non_embedding_columns_almost_equal(actual_column, expected_column) + assert_column(column_name, actual_column, expected_column) + + +def assert_column(column_name, actual_column, expected_column): + if column_name == "embeddings": + assert_embedding_columns_almost_equal(actual_column, expected_column) + elif column_name == "timestamp": + pd.testing.assert_series_equal(actual_column, expected_column) + elif column_name == "prediction_id": + pd.testing.assert_series_equal(actual_column, expected_column) + else: + assert_non_embedding_columns_almost_equal(actual_column, expected_column) def assert_non_embedding_columns_almost_equal(actual_column, expected_column): @@ -156,26 +165,136 @@ def assert_embedding_columns_almost_equal(actual_embeddings_columns, expected_em assert_array_almost_equal(actual_embedding, expected_embedding) -def test_dataset_column_normalization(): - num_samples = 10 +def random_uuids(): + return [str(uuid.uuid4()) for _ in range(num_samples)] + + +@pytest.mark.parametrize( + "input_df, input_schema", + [ + ( + { + "timestamp": np.full( + shape=num_samples, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + "prediction_id": random_uuids(), + }, + {"timestamp_column_name": "timestamp", "prediction_id_column_name": "prediction_id"}, + ), + ( + { + "timestamp": np.full( + shape=num_samples, fill_value=pd.Timestamp.utcnow().timestamp(), dtype=int + ), + "prediction_id": random_uuids(), + }, + {"timestamp_column_name": "timestamp", "prediction_id_column_name": "prediction_id"}, + ), + ( + { + "timestamp": np.full( + shape=num_samples, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + "prediction_id": range(num_samples), + }, + {"timestamp_column_name": "timestamp", "prediction_id_column_name": "prediction_id"}, + ), + ( + { + "prediction_id": random_uuids(), + }, + {"prediction_id_column_name": "prediction_id"}, + ), + ( + { + "timestamp": np.full( + shape=num_samples, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + }, + {"timestamp_column_name": "timestamp"}, + ), + ( + dict(), + dict(), + ), + ], + ids=[ + "test_dataset_normalization_columns_already_normalized", + "test_dataset_normalization_timestamp_integer_to_datetime", + "test_dataset_normalization_prediction_id_integer_to_string", + "test_dataset_normalization_add_missing_timestamp", + "test_dataset_normalization_add_missing_prediction_id", + "test_dataset_normalization_add_missing_timestamp_and_prediction_id", + ], + indirect=True, +) +def test_dataset_normalization(input_df, input_schema) -> None: + dataset = Dataset(dataframe=input_df, schema=input_schema) + + # Ensure existing data + for column_name in input_df: + assert column_name in dataset.dataframe.columns + actual_column = dataset.dataframe[column_name] + expected_column = input_df[column_name] + assert_column(column_name, actual_column, expected_column) + + # Ensure normalized columns exist if they did not exist in the initial normalization_df + assert "timestamp" in dataset.dataframe + assert dataset.dataframe.dtypes["timestamp"], "datetime[nz]" + assert "prediction_id" in dataset.dataframe + assert dataset.dataframe.dtypes["prediction_id"], "string" + + +@pytest.mark.parametrize( + "input_df, input_schema", + [ + ( + { + "prediction_id": np.full( + shape=num_samples, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + }, + {"prediction_id_column_name": "prediction_id"}, + ), + ( + { + "timestamp": random_uuids(), + }, + {"timestamp_column_name": "timestamp"}, + ), + ], + indirect=True, +) +def test_dataset_validation(input_df, input_schema) -> None: + with pytest.raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) + + +@pytest.fixture +def input_df(request): + """ + Provides a dataframe fixture with a base set of columns and an optional configurable set of additional columns + :param request: params contains the additional columns to add to the dataframe + :return: pd.DataFrame + """ data = { - "feature0": range(num_samples), - "feature1": range(num_samples), + "feature": range(num_samples), "predicted_score": range(num_samples), } - df = pd.DataFrame.from_dict(data) - kwargs = { - "feature_column_names": ["feature0", "feature1"], - "prediction_score_column_name": "predicted_score", - } - schema = Schema(**kwargs) - dataset = Dataset(dataframe=df, schema=schema) + data.update(request.param) + return pd.DataFrame.from_dict(data) - for column_name in df: - assert column_name in dataset.dataframe - actual_column = dataset.dataframe[column_name] - expected_column = df[column_name] - assert_non_embedding_columns_almost_equal(actual_column, expected_column) - assert "prediction_id" in dataset.dataframe - assert "timestamp" in dataset.dataframe +@pytest.fixture +def input_schema(request): + """ + Provides a phoneix Schema fixture with a base set of columns and an optional configurable set of additional columns + :param request: params contains the additional columns to add to the Schema + :return: Schema + """ + schema = { + "feature_column_names": ["feature"], + "prediction_score_column_name": "predicted_score", + } + schema.update(request.param) + return Schema(**schema) From 522c601134b6e90f8e862248beb39d35068d82d8 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Wed, 18 Jan 2023 05:53:39 -0800 Subject: [PATCH 06/27] Update test_dataset.py --- tests/datasets/test_dataset.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 30af77bed3..eae844aea3 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -119,8 +119,7 @@ def test_dataset_from_parquet_correctly_load_data_with_and_without_embeddings( ): dataset_name = "dataset-name" dataset = initialization_class_method(filepath=filepath, schema=schema, name=dataset_name) - print(dataset.dataframe) - print(dataset.dataframe.dtypes) + assert dataset.name == dataset_name for column_name in expected_df.columns: assert column_name in dataset.dataframe From b0c0380ef9bd949a04d85663e7f552cd0aae9c06 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Wed, 18 Jan 2023 05:57:33 -0800 Subject: [PATCH 07/27] Update test_dataset.py --- tests/datasets/test_dataset.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index eae844aea3..071c091881 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -272,7 +272,8 @@ def test_dataset_validation(input_df, input_schema) -> None: @pytest.fixture def input_df(request): """ - Provides a dataframe fixture with a base set of columns and an optional configurable set of additional columns + Provides a dataframe fixture with a base set of columns and an optional configurable + set of additional columns. :param request: params contains the additional columns to add to the dataframe :return: pd.DataFrame """ @@ -287,7 +288,8 @@ def input_df(request): @pytest.fixture def input_schema(request): """ - Provides a phoneix Schema fixture with a base set of columns and an optional configurable set of additional columns + Provides a phoneix Schema fixture with a base set of columns and an optional configurable + set of additional columns :param request: params contains the additional columns to add to the Schema :return: Schema """ From 5f59f3f11d3ded8234d1696e9c00734e9da7e9ca Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Thu, 19 Jan 2023 11:54:33 -0800 Subject: [PATCH 08/27] Update src/phoenix/datasets/validation.py Co-authored-by: Mikyo King --- src/phoenix/datasets/validation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/phoenix/datasets/validation.py b/src/phoenix/datasets/validation.py index 15afd4279f..f6b97d3a95 100644 --- a/src/phoenix/datasets/validation.py +++ b/src/phoenix/datasets/validation.py @@ -16,7 +16,7 @@ def validate_dataset_inputs(dataframe: DataFrame, schema: Schema) -> List[err.Va return list(general_checks) -def check_column_type(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: +def check_column_types(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: wrong_type_cols = [] if schema.timestamp_column_name is not None: if not ( From e4a4259abe069102e871658ff24dc17076d90c22 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Thu, 19 Jan 2023 12:03:41 -0800 Subject: [PATCH 09/27] Update src/phoenix/datasets/validation.py Co-authored-by: Mikyo King --- src/phoenix/datasets/validation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/phoenix/datasets/validation.py b/src/phoenix/datasets/validation.py index f6b97d3a95..242db71a2c 100644 --- a/src/phoenix/datasets/validation.py +++ b/src/phoenix/datasets/validation.py @@ -11,7 +11,7 @@ def validate_dataset_inputs(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: general_checks = chain( - check_missing_columns(dataframe, schema), check_column_type(dataframe, schema) + check_missing_columns(dataframe, schema), check_column_types(dataframe, schema) ) return list(general_checks) From f74100b8f266fc98fcbd2369dc99854f2ed83df9 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Thu, 19 Jan 2023 21:40:05 -0800 Subject: [PATCH 10/27] wip -- tests still fail after merge --- src/phoenix/datasets/dataset.py | 69 ++--- tests/datasets/test_dataset.py | 435 ++++++++++++++++++-------------- 2 files changed, 272 insertions(+), 232 deletions(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index 45bd2d208a..38f01d7565 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -3,10 +3,9 @@ import os import sys import uuid - from copy import deepcopy from dataclasses import fields, replace -from typing import Any, Dict, List, Optional, Set, Tuple, Union, Callable, Literal +from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union from pandas import DataFrame, Series, Timestamp, read_parquet, to_datetime from pandas.api.types import is_numeric_dtype @@ -185,52 +184,6 @@ def from_name(cls, name: str) -> "Dataset": schema = Schema.from_json(schema_json) return cls(df, schema, name, persist_to_disc=False) - - @staticmethod - def _parse_dataframe(dataframe: DataFrame, schema: Schema) -> DataFrame: - cols_to_add: Dict[str, Callable[[Any], Union[Timestamp, str]]] = dict() - if schema.timestamp_column_name is None: - now = Timestamp.utcnow() - schema = dataclasses.replace(schema, timestamp_column_name="timestamp") - cols_to_add["timestamp"] = lambda _: now - elif is_numeric_dtype(dataframe.dtypes[schema.timestamp_column_name]): - dataframe[schema.timestamp_column_name] = to_datetime( - dataframe[schema.timestamp_column_name], unit="ms" - ) - - if schema.prediction_id_column_name is None: - schema = dataclasses.replace(schema, prediction_id_column_name="prediction_id") - cols_to_add["prediction_id"] = lambda _: str(uuid.uuid4()) - elif is_numeric_dtype(dataframe.dtypes[schema.prediction_id_column_name]): - dataframe["prediction_id"] = dataframe["prediction_id"].apply(str) - - if len(cols_to_add) > 0: - dataframe = dataframe.assign(**cols_to_add) - - schema_cols = [ - schema.timestamp_column_name, - schema.prediction_id_column_name, - schema.prediction_label_column_name, - schema.prediction_score_column_name, - schema.actual_label_column_name, - schema.actual_score_column_name, - ] - # Append the feature column names to the columns if present - if schema.feature_column_names is not None: - schema_cols += schema.feature_column_names - - if schema.embedding_feature_column_names is not None: - for emb_feat_cols in schema.embedding_feature_column_names.values(): - schema_cols.append(emb_feat_cols.vector_column_name) - if emb_feat_cols.raw_data_column_name: - schema_cols.append(emb_feat_cols.raw_data_column_name) - if emb_feat_cols.link_to_data_column_name: - schema_cols.append(emb_feat_cols.link_to_data_column_name) - - drop_cols = [col for col in dataframe.columns if col not in schema_cols] - return dataframe.drop(columns=drop_cols) - - def to_disc(self) -> None: """writes the data and schema to disc""" @@ -463,6 +416,24 @@ def _create_parsed_dataframe_and_schema( for column_name in dataframe.columns: if column_name_to_include.get(str(column_name), False): included_column_names.append(str(column_name)) - parsed_dataframe = dataframe[included_column_names] + parsed_dataframe = dataframe[included_column_names].copy() parsed_schema = replace(schema, excludes=None, **schema_patch) + + if parsed_schema.timestamp_column_name is None: + now = Timestamp.utcnow() + parsed_schema = dataclasses.replace(parsed_schema, timestamp_column_name="timestamp") + parsed_dataframe["timestamp"] = now + elif is_numeric_dtype(dataframe.dtypes[schema.timestamp_column_name]): + parsed_dataframe[schema.timestamp_column_name] = parsed_dataframe[schema.timestamp_column_name].apply( + lambda x: to_datetime(x, unit="ms") + ) + + if parsed_schema.prediction_id_column_name is None: + parsed_schema = dataclasses.replace( + parsed_schema, prediction_id_column_name="prediction_id" + ) + parsed_dataframe["prediction_id"] = parsed_dataframe.apply(lambda _: str(uuid.uuid4())) + elif is_numeric_dtype(parsed_dataframe.dtypes[schema.prediction_id_column_name]): + parsed_dataframe[schema.prediction_id_column_name] = parsed_dataframe[schema.prediction_id_column_name].astype(str) + print(parsed_dataframe.to_string()) return parsed_dataframe, parsed_schema diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 4b30c638c3..aec68124e2 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -2,28 +2,24 @@ Test dataset """ +import logging import random import uuid -from functools import partial +from dataclasses import replace import numpy as np import pandas as pd import pytest -from numpy.testing import assert_array_almost_equal -from pytest_lazyfixture import lazy_fixture - -from phoenix.datasets.dataset import Dataset, EmbeddingColumnNames, Schema -from phoenix.datasets.errors import DatasetError - -import logging -from dataclasses import replace - -import numpy as np from pandas import DataFrame from pytest import LogCaptureFixture -from phoenix.datasets import EmbeddingColumnNames, Schema -from phoenix.datasets.dataset import _parse_dataframe_and_schema +from phoenix.datasets.dataset import ( + Dataset, + EmbeddingColumnNames, + Schema, + _parse_dataframe_and_schema, +) +from phoenix.datasets.errors import DatasetError class TestParseDataFrameAndSchema: @@ -37,8 +33,8 @@ class TestParseDataFrameAndSchema: def test_schema_contains_all_dataframe_columns_results_in_unchanged_output(self, caplog): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), - "ts": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -47,7 +43,7 @@ def test_schema_contains_all_dataframe_columns_results_in_unchanged_output(self, ) input_schema = Schema( prediction_id_column_name="prediction_id", - timestamp_column_name="ts", + timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], tag_column_names=["tag0"], prediction_label_column_name="prediction_label", @@ -67,8 +63,8 @@ def test_schema_contains_all_dataframe_columns_results_in_unchanged_output(self, def test_column_present_in_dataframe_but_missing_from_schema_is_dropped(self, caplog): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), - "ts": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -77,17 +73,17 @@ def test_column_present_in_dataframe_but_missing_from_schema_is_dropped(self, ca ) input_schema = Schema( prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], - tag_column_names=["tag0"], prediction_label_column_name="prediction_label", ) self._run_function_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ - [col for col in input_dataframe.columns if col != "ts"] + [col for col in input_dataframe.columns if col != "tag0"] ], - expected_parsed_schema=replace(input_schema, timestamp_column_name=None), + expected_parsed_schema=replace(input_schema, tag_column_names=None), should_log_warning_to_user=False, caplog=caplog, ) @@ -97,7 +93,8 @@ def test_some_features_excluded_removes_excluded_features_columns_and_keeps_the_ ): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -106,6 +103,7 @@ def test_some_features_excluded_removes_excluded_features_columns_and_keeps_the_ ) input_schema = Schema( prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], tag_column_names=["tag0"], prediction_label_column_name="prediction_label", @@ -115,7 +113,7 @@ def test_some_features_excluded_removes_excluded_features_columns_and_keeps_the_ input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ - ["prediction_id", "prediction_label", "feature0", "tag0"] + ["prediction_id", "timestamp", "prediction_label", "feature0", "tag0"] ], expected_parsed_schema=replace( input_schema, @@ -133,7 +131,8 @@ def test_all_features_and_tags_excluded_sets_schema_features_and_tags_fields_to_ ): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -143,6 +142,7 @@ def test_all_features_and_tags_excluded_sets_schema_features_and_tags_fields_to_ excludes = ["feature0", "feature1", "tag0"] input_schema = Schema( prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], tag_column_names=["tag0"], prediction_label_column_name="prediction_label", @@ -151,7 +151,7 @@ def test_all_features_and_tags_excluded_sets_schema_features_and_tags_fields_to_ self._run_function_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, - expected_parsed_dataframe=input_dataframe[["prediction_id", "prediction_label"]], + expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp", "prediction_label"]], expected_parsed_schema=replace( input_schema, prediction_label_column_name="prediction_label", @@ -166,8 +166,8 @@ def test_all_features_and_tags_excluded_sets_schema_features_and_tags_fields_to_ def test_excluded_single_column_schema_fields_set_to_none(self, caplog): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), - "ts": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -175,15 +175,15 @@ def test_excluded_single_column_schema_fields_set_to_none(self, caplog): ) input_schema = Schema( prediction_id_column_name="prediction_id", - timestamp_column_name="ts", + timestamp_column_name="timestamp", prediction_label_column_name="prediction_label", feature_column_names=["feature0", "feature1"], - excludes=["prediction_label", "ts"], + excludes=["prediction_label", "timestamp"], ) self._run_function_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, - expected_parsed_dataframe=input_dataframe[["prediction_id", "feature0", "feature1"]], + expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp", "feature0", "feature1"]], expected_parsed_schema=replace( input_schema, prediction_label_column_name=None, @@ -197,7 +197,8 @@ def test_excluded_single_column_schema_fields_set_to_none(self, caplog): def test_no_input_schema_features_and_no_excludes_discovers_features(self, caplog): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": range(self.num_records), "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -207,6 +208,7 @@ def test_no_input_schema_features_and_no_excludes_discovers_features(self, caplo input_schema = Schema( prediction_id_column_name="prediction_id", prediction_label_column_name="prediction_label", + timestamp_column_name="timestamp", ) self._run_function_and_check_output( input_dataframe=input_dataframe, @@ -224,7 +226,8 @@ def test_no_input_schema_features_and_list_of_excludes_discovers_non_excluded_fe ): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -236,6 +239,7 @@ def test_no_input_schema_features_and_list_of_excludes_discovers_non_excluded_fe excludes = ["prediction_label", "feature1", "tag0"] input_schema = Schema( prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", tag_column_names=["tag0", "tag1"], prediction_label_column_name="prediction_label", excludes=excludes, @@ -260,7 +264,8 @@ def test_no_input_schema_features_and_list_of_excludes_discovers_non_excluded_fe def test_excluded_column_not_contained_in_dataframe_logs_warning(self, caplog): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -272,6 +277,7 @@ def test_excluded_column_not_contained_in_dataframe_logs_warning(self, caplog): excludes = ["prediction_label", "column_not_in_dataframe"] input_schema = Schema( prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1", "feature2"], tag_column_names=["tag0", "tag1"], prediction_label_column_name="prediction_label", @@ -281,7 +287,7 @@ def test_excluded_column_not_contained_in_dataframe_logs_warning(self, caplog): input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ - ["prediction_id", "feature0", "feature1", "feature2", "tag0", "tag1"] + ["prediction_id", "timestamp", "feature0", "feature1", "feature2", "tag0", "tag1"] ], expected_parsed_schema=replace( input_schema, prediction_label_column_name=None, excludes=None @@ -293,6 +299,8 @@ def test_excluded_column_not_contained_in_dataframe_logs_warning(self, caplog): def test_schema_includes_embedding_feature_has_all_embedding_columns_included(self, caplog): input_dataframe = DataFrame( { + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], @@ -301,6 +309,8 @@ def test_schema_includes_embedding_feature_has_all_embedding_columns_included(se } ) input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", @@ -321,6 +331,8 @@ def test_schema_includes_embedding_feature_has_all_embedding_columns_included(se def test_embedding_columns_of_excluded_embedding_feature_are_removed(self, caplog): input_dataframe = DataFrame( { + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], @@ -332,6 +344,8 @@ def test_embedding_columns_of_excluded_embedding_feature_are_removed(self, caplo } ) input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", @@ -370,6 +384,8 @@ def test_embedding_columns_of_excluded_embedding_feature_are_removed(self, caplo def test_excluding_all_embedding_features_sets_schema_embedding_field_to_none(self, caplog): input_dataframe = DataFrame( { + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], @@ -378,6 +394,8 @@ def test_excluding_all_embedding_features_sets_schema_embedding_field_to_none(se } ) input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", @@ -390,7 +408,7 @@ def test_excluding_all_embedding_features_sets_schema_embedding_field_to_none(se self._run_function_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, - expected_parsed_dataframe=input_dataframe[[]], + expected_parsed_dataframe=input_dataframe["prediction_id", "timestamp"], expected_parsed_schema=replace( input_schema, embedding_feature_column_names=None, @@ -405,6 +423,8 @@ def test_excluding_an_embedding_column_rather_than_the_embedding_feature_name_lo ): input_dataframe = DataFrame( { + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], @@ -413,6 +433,8 @@ def test_excluding_an_embedding_column_rather_than_the_embedding_feature_name_lo } ) input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", @@ -440,12 +462,16 @@ def test_excluding_embedding_feature_with_same_name_as_embedding_column_does_not ): input_dataframe = DataFrame( { + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding0": [np.zeros(self.embedding_dimension) for _ in range(self.num_records)], "link_to_data0": [f"some-link{index}" for index in range(self.num_records)], "raw_data_column0": [f"some-text{index}" for index in range(self.num_records)], } ) input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding0": EmbeddingColumnNames( vector_column_name="embedding0", @@ -468,18 +494,56 @@ def test_excluding_embedding_feature_with_same_name_as_embedding_column_does_not caplog=caplog, ) + def test_excluding_all_embedding_features_sets_schema_embedding_field_to_none(self, caplog): + input_dataframe = DataFrame( + { + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], + "embedding_vector0": [ + np.zeros(self.embedding_dimension) for _ in range(self.num_records) + ], + "link_to_data0": [f"some-link{index}" for index in range(self.num_records)], + "raw_data_column0": [f"some-text{index}" for index in range(self.num_records)], + } + ) + input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", + embedding_feature_column_names={ + "embedding_feature0": EmbeddingColumnNames( + vector_column_name="embedding_vector0", + link_to_data_column_name="link_to_data0", + raw_data_column_name="raw_data_column0", + ), + }, + excludes=["embedding_feature0"], + ) + self._run_function_and_check_output( + input_dataframe=input_dataframe, + input_schema=input_schema, + expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp"]], + expected_parsed_schema=replace( + input_schema, + embedding_feature_column_names=None, + excludes=None, + ), + should_log_warning_to_user=False, + caplog=caplog, + ) + def _run_function_and_check_output( - self, - input_dataframe: DataFrame, - input_schema: Schema, - expected_parsed_dataframe: DataFrame, - expected_parsed_schema: Schema, - should_log_warning_to_user: bool, - caplog: LogCaptureFixture, + self, + input_dataframe: DataFrame, + input_schema: Schema, + expected_parsed_dataframe: DataFrame, + expected_parsed_schema: Schema, + should_log_warning_to_user: bool, + caplog: LogCaptureFixture, ) -> None: - parsed_dataframe, parsed_schema = _parse_dataframe_and_schema( - dataframe=input_dataframe, schema=input_schema - ) + dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + parsed_dataframe = dataset.dataframe + parsed_schema = dataset.schema + assert parsed_dataframe.equals(expected_parsed_dataframe) assert parsed_schema == expected_parsed_schema assert self._warning_logged(caplog) is should_log_warning_to_user @@ -494,164 +558,169 @@ def _warning_logged(caplog: LogCaptureFixture) -> bool: return True return False - @property - def num_records(self): - return self._NUM_RECORDS - - @property - def embedding_dimension(self): - return self._EMBEDDING_DIMENSION - - - num_samples = 9 - - - @pytest.fixture - def random_seed(): - np.random.seed(0) - random.seed(0) - - - @pytest.fixture - def include_embeddings(request): - return request.param - - - def random_uuids(): - return [str(uuid.uuid4()) for _ in range(num_samples)] - - - @pytest.mark.parametrize( - "input_df, input_schema", - [ - ( + def test_dataset_normalization_columns_already_normalized(self): + input_dataframe = DataFrame( { + "prediction_label": [f"label{index}" for index in range(self._NUM_RECORDS)], + "feature0": np.zeros(self.num_records), "timestamp": np.full( - shape=num_samples, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp ), - "prediction_id": random_uuids(), - }, - {"timestamp_column_name": "timestamp", "prediction_id_column_name": "prediction_id"}, - ), - ( + "prediction_id": self.random_uuids(), + } + ) + + input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + self.validate_normalization(dataset, input_dataframe) + + def test_dataset_normalization_timestamp_integer_to_datetime(self): + input_dataframe = DataFrame( { + "prediction_label": [f"label{index}" for index in range(self._NUM_RECORDS)], + "feature0": np.zeros(self.num_records), "timestamp": np.full( - shape=num_samples, fill_value=pd.Timestamp.utcnow().timestamp(), dtype=int + shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp ), - "prediction_id": random_uuids(), - }, - {"timestamp_column_name": "timestamp", "prediction_id_column_name": "prediction_id"}, - ), - ( + "prediction_id": self.random_uuids(), + } + ) + input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + self.validate_normalization(dataset, input_dataframe) + + def test_dataset_normalization_prediction_id_integer_to_string(self): + input_dataframe = DataFrame( { + "prediction_label": [f"label{index}" for index in range(self._NUM_RECORDS)], + "feature0": np.zeros(self.num_records), "timestamp": np.full( - shape=num_samples, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp ), - "prediction_id": range(num_samples), - }, - {"timestamp_column_name": "timestamp", "prediction_id_column_name": "prediction_id"}, - ), - ( - { - "prediction_id": random_uuids(), - }, - {"prediction_id_column_name": "prediction_id"}, - ), - ( + "prediction_id": range(self.num_records), + } + ) + input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + self.validate_normalization(dataset, input_dataframe) + + def test_dataset_normalization_columns_add_missing_prediction_id(self): + input_dataframe = DataFrame( { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), "timestamp": np.full( - shape=num_samples, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp ), - }, - {"timestamp_column_name": "timestamp"}, - ), - ( - dict(), - dict(), - ), - ], - ids=[ - "test_dataset_normalization_columns_already_normalized", - "test_dataset_normalization_timestamp_integer_to_datetime", - "test_dataset_normalization_prediction_id_integer_to_string", - "test_dataset_normalization_add_missing_timestamp", - "test_dataset_normalization_add_missing_prediction_id", - "test_dataset_normalization_add_missing_timestamp_and_prediction_id", - ], - indirect=True, -) + } + ) + input_schema = Schema( + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) -def test_dataset_normalization(input_df, input_schema) -> None: - dataset = Dataset(dataframe=input_df, schema=input_schema) + dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + self.validate_normalization(dataset, input_dataframe) - # Ensure existing data - for column_name in input_df: - assert column_name in dataset.dataframe.columns - actual_column = dataset.dataframe[column_name] - expected_column = input_df[column_name] - assert_column(column_name, actual_column, expected_column) + def test_dataset_normalization_columns_add_missing_timestamp(self): + input_dataframe = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "prediction_id": self.random_uuids(), + } + ) - # Ensure normalized columns exist if they did not exist in the initial normalization_df - assert "timestamp" in dataset.dataframe - assert dataset.dataframe.dtypes["timestamp"], "datetime[nz]" - assert "prediction_id" in dataset.dataframe - assert dataset.dataframe.dtypes["prediction_id"], "string" + input_schema = Schema( + prediction_id_column_name="prediction_id", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + self.validate_normalization(dataset, input_dataframe) -@pytest.mark.parametrize( - "input_df, input_schema", - [ - ( - { - "prediction_id": np.full( - shape=num_samples, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp - ), - }, - {"prediction_id_column_name": "prediction_id"}, - ), - ( + def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self): + input_dataframe = DataFrame( { - "timestamp": random_uuids(), - }, - {"timestamp_column_name": "timestamp"}, - ), - ], - indirect=True, -) -def test_dataset_validation(input_df, input_schema) -> None: - with pytest.raises(DatasetError): - Dataset(dataframe=input_df, schema=input_schema) + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self._NUM_RECORDS), + } + ) + input_schema = Schema( + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) -@pytest.fixture -def input_df(request): - """ - Provides a dataframe fixture with a base set of columns and an optional configurable - set of additional columns. - :param request: params contains the additional columns to add to the dataframe - :return: pd.DataFrame - """ - data = { - "feature": range(num_samples), - "predicted_score": range(num_samples), - } - data.update(request.param) - return pd.DataFrame.from_dict(data) + dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + self.validate_normalization(dataset, input_dataframe) + @staticmethod + def validate_normalization(dataset, input_dataframe): + for column_name in input_dataframe: + assert column_name in dataset.dataframe.columns + actual_column = dataset.dataframe[column_name] + expected_column = input_dataframe[column_name] + pd.testing.assert_series_equal(actual_column, expected_column) + + # Ensure normalized columns exist if they did not exist in the initial normalization_df + assert "timestamp" in dataset.dataframe + assert dataset.dataframe.dtypes["timestamp"], "datetime[nz]" + assert "prediction_id" in dataset.dataframe + assert dataset.dataframe.dtypes["prediction_id"], "string" + + def random_uuids(self): + return [str(uuid.uuid4()) for _ in range(self.num_records)] -@pytest.fixture -def input_schema(request): - """ - Provides a phoneix Schema fixture with a base set of columns and an optional configurable - set of additional columns - :param request: params contains the additional columns to add to the Schema - :return: Schema - """ - schema = { - "feature_column_names": ["feature"], - "prediction_score_column_name": "predicted_score", - } - schema.update(request.param) - return Schema(**schema) - \ No newline at end of file + @property + def num_records(self): + return self._NUM_RECORDS + + @property + def embedding_dimension(self): + return self._EMBEDDING_DIMENSION + + # @pytest.mark.parametrize( + # "input_df, input_schema", + # [ + # ( + # { + # "prediction_id": np.full( + # shape=num_samples, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + # ), + # }, + # {"prediction_id_column_name": "prediction_id"}, + # ), + # ( + # { + # "timestamp": random_uuids(), + # }, + # {"timestamp_column_name": "timestamp"}, + # ), + # ], + # indirect=True, + # ) + # def test_dataset_validation(input_df, input_schema) -> None: + # with pytest.raises(DatasetError): + # Dataset(dataframe=input_df, schema=input_schema) From 7e1c8a8199f62189445d4eff5d00f8879201af50 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Thu, 19 Jan 2023 22:08:28 -0800 Subject: [PATCH 11/27] Update test_dataset.py --- tests/datasets/test_dataset.py | 36 ---------------------------------- 1 file changed, 36 deletions(-) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index aec68124e2..3b89b8724d 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -494,42 +494,6 @@ def test_excluding_embedding_feature_with_same_name_as_embedding_column_does_not caplog=caplog, ) - def test_excluding_all_embedding_features_sets_schema_embedding_field_to_none(self, caplog): - input_dataframe = DataFrame( - { - "prediction_id": [str(x) for x in range(self.num_records)], - "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], - "embedding_vector0": [ - np.zeros(self.embedding_dimension) for _ in range(self.num_records) - ], - "link_to_data0": [f"some-link{index}" for index in range(self.num_records)], - "raw_data_column0": [f"some-text{index}" for index in range(self.num_records)], - } - ) - input_schema = Schema( - prediction_id_column_name="prediction_id", - timestamp_column_name="timestamp", - embedding_feature_column_names={ - "embedding_feature0": EmbeddingColumnNames( - vector_column_name="embedding_vector0", - link_to_data_column_name="link_to_data0", - raw_data_column_name="raw_data_column0", - ), - }, - excludes=["embedding_feature0"], - ) - self._run_function_and_check_output( - input_dataframe=input_dataframe, - input_schema=input_schema, - expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp"]], - expected_parsed_schema=replace( - input_schema, - embedding_feature_column_names=None, - excludes=None, - ), - should_log_warning_to_user=False, - caplog=caplog, - ) def _run_function_and_check_output( self, From 0d043670d6043af9149422643711d3a927923e0c Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Thu, 19 Jan 2023 22:49:52 -0800 Subject: [PATCH 12/27] fixes to tests --- src/phoenix/datasets/dataset.py | 2 +- src/phoenix/datasets/schema.py | 2 +- tests/datasets/test_dataset.py | 14 +++++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index 38f01d7565..534dc37b61 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -435,5 +435,5 @@ def _create_parsed_dataframe_and_schema( parsed_dataframe["prediction_id"] = parsed_dataframe.apply(lambda _: str(uuid.uuid4())) elif is_numeric_dtype(parsed_dataframe.dtypes[schema.prediction_id_column_name]): parsed_dataframe[schema.prediction_id_column_name] = parsed_dataframe[schema.prediction_id_column_name].astype(str) - print(parsed_dataframe.to_string()) + return parsed_dataframe, parsed_schema diff --git a/src/phoenix/datasets/schema.py b/src/phoenix/datasets/schema.py index 8d08079c15..2d5e8a090d 100644 --- a/src/phoenix/datasets/schema.py +++ b/src/phoenix/datasets/schema.py @@ -39,7 +39,7 @@ class Schema(Dict[SchemaFieldName, SchemaFieldValue]): def to_json(self) -> str: "Converts the schema to a dict for JSON serialization" - dictionary = self.__dict__ + dictionary = {} for field in self.__dataclass_fields__: value = getattr(self, field) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 3b89b8724d..35858439cb 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -178,7 +178,7 @@ def test_excluded_single_column_schema_fields_set_to_none(self, caplog): timestamp_column_name="timestamp", prediction_label_column_name="prediction_label", feature_column_names=["feature0", "feature1"], - excludes=["prediction_label", "timestamp"], + excludes=["prediction_label"], ) self._run_function_and_check_output( input_dataframe=input_dataframe, @@ -198,7 +198,7 @@ def test_no_input_schema_features_and_no_excludes_discovers_features(self, caplo input_dataframe = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], - "timestamp": range(self.num_records), + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -248,7 +248,7 @@ def test_no_input_schema_features_and_list_of_excludes_discovers_non_excluded_fe input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ - ["prediction_id", "feature0", "feature2", "tag1"] + ["prediction_id", "timestamp", "feature0", "feature2", "tag1"] ], expected_parsed_schema=replace( input_schema, @@ -364,7 +364,7 @@ def test_embedding_columns_of_excluded_embedding_feature_are_removed(self, caplo input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ - ["embedding_vector1", "link_to_data1", "raw_data_column1"] + ["prediction_id", "timestamp", "embedding_vector1", "link_to_data1", "raw_data_column1"] ], expected_parsed_schema=replace( input_schema, @@ -408,7 +408,7 @@ def test_excluding_all_embedding_features_sets_schema_embedding_field_to_none(se self._run_function_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, - expected_parsed_dataframe=input_dataframe["prediction_id", "timestamp"], + expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp"]], expected_parsed_schema=replace( input_schema, embedding_feature_column_names=None, @@ -484,7 +484,7 @@ def test_excluding_embedding_feature_with_same_name_as_embedding_column_does_not self._run_function_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, - expected_parsed_dataframe=input_dataframe[[]], + expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp"]], expected_parsed_schema=replace( input_schema, embedding_feature_column_names=None, @@ -550,7 +550,7 @@ def test_dataset_normalization_timestamp_integer_to_datetime(self): "prediction_label": [f"label{index}" for index in range(self._NUM_RECORDS)], "feature0": np.zeros(self.num_records), "timestamp": np.full( - shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + shape=self.num_records, fill_value=pd.Timestamp.utcnow().timestamp(), dtype=int ), "prediction_id": self.random_uuids(), } From a8aa8cb50c5dd4feeb9f7d137458537936e91653 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Thu, 19 Jan 2023 23:21:23 -0800 Subject: [PATCH 13/27] Update test_dataset.py --- tests/datasets/test_dataset.py | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 35858439cb..80109acf2f 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -542,7 +542,7 @@ def test_dataset_normalization_columns_already_normalized(self): ) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - self.validate_normalization(dataset, input_dataframe) + dataset.dataframe.equals(input_dataframe) def test_dataset_normalization_timestamp_integer_to_datetime(self): input_dataframe = DataFrame( @@ -563,7 +563,7 @@ def test_dataset_normalization_timestamp_integer_to_datetime(self): ) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - self.validate_normalization(dataset, input_dataframe) + dataset.dataframe.drop(columns=["timestamp"]).equals(input_dataframe.drop(columns=["timestamp"])) def test_dataset_normalization_prediction_id_integer_to_string(self): input_dataframe = DataFrame( @@ -584,7 +584,7 @@ def test_dataset_normalization_prediction_id_integer_to_string(self): ) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - self.validate_normalization(dataset, input_dataframe) + dataset.dataframe.drop(columns=["prediction_id"]).equals(input_dataframe.drop(columns=["prediction_id"])) def test_dataset_normalization_columns_add_missing_prediction_id(self): input_dataframe = DataFrame( @@ -604,7 +604,9 @@ def test_dataset_normalization_columns_add_missing_prediction_id(self): ) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - self.validate_normalization(dataset, input_dataframe) + dataset.dataframe.equals(input_dataframe) + assert "prediction_id" in dataset.dataframe + assert dataset.dataframe.dtypes["prediction_id"], "string" def test_dataset_normalization_columns_add_missing_timestamp(self): input_dataframe = DataFrame( @@ -622,7 +624,10 @@ def test_dataset_normalization_columns_add_missing_timestamp(self): ) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - self.validate_normalization(dataset, input_dataframe) + dataset.dataframe.equals(input_dataframe) + assert "timestamp" in dataset.dataframe + assert dataset.dataframe.dtypes["timestamp"], "datetime[nz]" + def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self): input_dataframe = DataFrame( @@ -638,21 +643,10 @@ def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self) ) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - self.validate_normalization(dataset, input_dataframe) - - @staticmethod - def validate_normalization(dataset, input_dataframe): - for column_name in input_dataframe: - assert column_name in dataset.dataframe.columns - actual_column = dataset.dataframe[column_name] - expected_column = input_dataframe[column_name] - pd.testing.assert_series_equal(actual_column, expected_column) - - # Ensure normalized columns exist if they did not exist in the initial normalization_df + dataset.dataframe.equals(input_dataframe) assert "timestamp" in dataset.dataframe assert dataset.dataframe.dtypes["timestamp"], "datetime[nz]" - assert "prediction_id" in dataset.dataframe - assert dataset.dataframe.dtypes["prediction_id"], "string" + def random_uuids(self): return [str(uuid.uuid4()) for _ in range(self.num_records)] From 64becaae9ea93bd23d40a22217be7f320f071796 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Thu, 19 Jan 2023 23:23:30 -0800 Subject: [PATCH 14/27] formatting --- src/phoenix/datasets/dataset.py | 10 +++++--- tests/datasets/test_dataset.py | 43 +++++++++++++++++++++------------ 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index 534dc37b61..d5ab58b54b 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -424,9 +424,9 @@ def _create_parsed_dataframe_and_schema( parsed_schema = dataclasses.replace(parsed_schema, timestamp_column_name="timestamp") parsed_dataframe["timestamp"] = now elif is_numeric_dtype(dataframe.dtypes[schema.timestamp_column_name]): - parsed_dataframe[schema.timestamp_column_name] = parsed_dataframe[schema.timestamp_column_name].apply( - lambda x: to_datetime(x, unit="ms") - ) + parsed_dataframe[schema.timestamp_column_name] = parsed_dataframe[ + schema.timestamp_column_name + ].apply(lambda x: to_datetime(x, unit="ms")) if parsed_schema.prediction_id_column_name is None: parsed_schema = dataclasses.replace( @@ -434,6 +434,8 @@ def _create_parsed_dataframe_and_schema( ) parsed_dataframe["prediction_id"] = parsed_dataframe.apply(lambda _: str(uuid.uuid4())) elif is_numeric_dtype(parsed_dataframe.dtypes[schema.prediction_id_column_name]): - parsed_dataframe[schema.prediction_id_column_name] = parsed_dataframe[schema.prediction_id_column_name].astype(str) + parsed_dataframe[schema.prediction_id_column_name] = parsed_dataframe[ + schema.prediction_id_column_name + ].astype(str) return parsed_dataframe, parsed_schema diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 80109acf2f..99c1b83a27 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -151,7 +151,9 @@ def test_all_features_and_tags_excluded_sets_schema_features_and_tags_fields_to_ self._run_function_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, - expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp", "prediction_label"]], + expected_parsed_dataframe=input_dataframe[ + ["prediction_id", "timestamp", "prediction_label"] + ], expected_parsed_schema=replace( input_schema, prediction_label_column_name="prediction_label", @@ -183,7 +185,9 @@ def test_excluded_single_column_schema_fields_set_to_none(self, caplog): self._run_function_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, - expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp", "feature0", "feature1"]], + expected_parsed_dataframe=input_dataframe[ + ["prediction_id", "timestamp", "feature0", "feature1"] + ], expected_parsed_schema=replace( input_schema, prediction_label_column_name=None, @@ -317,7 +321,7 @@ def test_schema_includes_embedding_feature_has_all_embedding_columns_included(se link_to_data_column_name="link_to_data0", raw_data_column_name="raw_data_column0", ), - } + }, ) self._run_function_and_check_output( input_dataframe=input_dataframe, @@ -364,7 +368,13 @@ def test_embedding_columns_of_excluded_embedding_feature_are_removed(self, caplo input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ - ["prediction_id", "timestamp", "embedding_vector1", "link_to_data1", "raw_data_column1"] + [ + "prediction_id", + "timestamp", + "embedding_vector1", + "link_to_data1", + "raw_data_column1", + ] ], expected_parsed_schema=replace( input_schema, @@ -494,15 +504,14 @@ def test_excluding_embedding_feature_with_same_name_as_embedding_column_does_not caplog=caplog, ) - def _run_function_and_check_output( - self, - input_dataframe: DataFrame, - input_schema: Schema, - expected_parsed_dataframe: DataFrame, - expected_parsed_schema: Schema, - should_log_warning_to_user: bool, - caplog: LogCaptureFixture, + self, + input_dataframe: DataFrame, + input_schema: Schema, + expected_parsed_dataframe: DataFrame, + expected_parsed_schema: Schema, + should_log_warning_to_user: bool, + caplog: LogCaptureFixture, ) -> None: dataset = Dataset(dataframe=input_dataframe, schema=input_schema) parsed_dataframe = dataset.dataframe @@ -563,7 +572,9 @@ def test_dataset_normalization_timestamp_integer_to_datetime(self): ) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - dataset.dataframe.drop(columns=["timestamp"]).equals(input_dataframe.drop(columns=["timestamp"])) + dataset.dataframe.drop(columns=["timestamp"]).equals( + input_dataframe.drop(columns=["timestamp"]) + ) def test_dataset_normalization_prediction_id_integer_to_string(self): input_dataframe = DataFrame( @@ -584,7 +595,9 @@ def test_dataset_normalization_prediction_id_integer_to_string(self): ) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - dataset.dataframe.drop(columns=["prediction_id"]).equals(input_dataframe.drop(columns=["prediction_id"])) + dataset.dataframe.drop(columns=["prediction_id"]).equals( + input_dataframe.drop(columns=["prediction_id"]) + ) def test_dataset_normalization_columns_add_missing_prediction_id(self): input_dataframe = DataFrame( @@ -628,7 +641,6 @@ def test_dataset_normalization_columns_add_missing_timestamp(self): assert "timestamp" in dataset.dataframe assert dataset.dataframe.dtypes["timestamp"], "datetime[nz]" - def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self): input_dataframe = DataFrame( { @@ -647,7 +659,6 @@ def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self) assert "timestamp" in dataset.dataframe assert dataset.dataframe.dtypes["timestamp"], "datetime[nz]" - def random_uuids(self): return [str(uuid.uuid4()) for _ in range(self.num_records)] From 5989ae7145f38f8fc0a7df21f14389d4b9ae27b7 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Thu, 19 Jan 2023 23:29:21 -0800 Subject: [PATCH 15/27] imports --- src/phoenix/datasets/dataset.py | 2 +- tests/datasets/test_dataset.py | 12 +++--------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index d5ab58b54b..b321bb48ae 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -5,7 +5,7 @@ import uuid from copy import deepcopy from dataclasses import fields, replace -from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union +from typing import Any, Dict, List, Optional, Set, Tuple, Union from pandas import DataFrame, Series, Timestamp, read_parquet, to_datetime from pandas.api.types import is_numeric_dtype diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 99c1b83a27..c92fb1f7fa 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -3,23 +3,17 @@ """ import logging -import random import uuid from dataclasses import replace import numpy as np import pandas as pd -import pytest from pandas import DataFrame from pytest import LogCaptureFixture -from phoenix.datasets.dataset import ( - Dataset, - EmbeddingColumnNames, - Schema, - _parse_dataframe_and_schema, -) -from phoenix.datasets.errors import DatasetError +from phoenix.datasets.dataset import Dataset, EmbeddingColumnNames, Schema + +# from phoenix.datasets.errors import DatasetError class TestParseDataFrameAndSchema: From 831a871522946fee69f9db64ba660446351bc7ba Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Fri, 20 Jan 2023 09:54:00 -0800 Subject: [PATCH 16/27] fix test, address feedback comment --- src/phoenix/datasets/validation.py | 2 +- tests/datasets/test_dataset.py | 100 ++++++++++++++++------------- 2 files changed, 58 insertions(+), 44 deletions(-) diff --git a/src/phoenix/datasets/validation.py b/src/phoenix/datasets/validation.py index 242db71a2c..6314f0bf1a 100644 --- a/src/phoenix/datasets/validation.py +++ b/src/phoenix/datasets/validation.py @@ -17,7 +17,7 @@ def validate_dataset_inputs(dataframe: DataFrame, schema: Schema) -> List[err.Va def check_column_types(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: - wrong_type_cols = [] + wrong_type_cols: str = [] if schema.timestamp_column_name is not None: if not ( is_numeric_dtype(dataframe.dtypes[schema.timestamp_column_name]) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index c92fb1f7fa..757d75db39 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -8,15 +8,14 @@ import numpy as np import pandas as pd -from pandas import DataFrame -from pytest import LogCaptureFixture +from pandas import DataFrame, to_datetime +from pytest import LogCaptureFixture, raises from phoenix.datasets.dataset import Dataset, EmbeddingColumnNames, Schema +from phoenix.datasets.errors import DatasetError -# from phoenix.datasets.errors import DatasetError - -class TestParseDataFrameAndSchema: +class TestDataset: """ Tests for `_parse_dataframe_and_schema` """ @@ -185,7 +184,6 @@ def test_excluded_single_column_schema_fields_set_to_none(self, caplog): expected_parsed_schema=replace( input_schema, prediction_label_column_name=None, - timestamp_column_name=None, excludes=None, ), should_log_warning_to_user=False, @@ -528,12 +526,12 @@ def _warning_logged(caplog: LogCaptureFixture) -> bool: def test_dataset_normalization_columns_already_normalized(self): input_dataframe = DataFrame( { - "prediction_label": [f"label{index}" for index in range(self._NUM_RECORDS)], + "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "timestamp": np.full( shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp ), - "prediction_id": self.random_uuids(), + "prediction_id": self._random_uuids(), } ) @@ -550,12 +548,12 @@ def test_dataset_normalization_columns_already_normalized(self): def test_dataset_normalization_timestamp_integer_to_datetime(self): input_dataframe = DataFrame( { - "prediction_label": [f"label{index}" for index in range(self._NUM_RECORDS)], + "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "timestamp": np.full( shape=self.num_records, fill_value=pd.Timestamp.utcnow().timestamp(), dtype=int ), - "prediction_id": self.random_uuids(), + "prediction_id": self._random_uuids(), } ) input_schema = Schema( @@ -566,14 +564,15 @@ def test_dataset_normalization_timestamp_integer_to_datetime(self): ) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - dataset.dataframe.drop(columns=["timestamp"]).equals( - input_dataframe.drop(columns=["timestamp"]) + input_dataframe["timestamp"] = input_dataframe["timestamp"].apply( + lambda x: to_datetime(x, unit="ms") ) + dataset.dataframe.equals(input_dataframe) def test_dataset_normalization_prediction_id_integer_to_string(self): input_dataframe = DataFrame( { - "prediction_label": [f"label{index}" for index in range(self._NUM_RECORDS)], + "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "timestamp": np.full( shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp @@ -589,9 +588,8 @@ def test_dataset_normalization_prediction_id_integer_to_string(self): ) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - dataset.dataframe.drop(columns=["prediction_id"]).equals( - input_dataframe.drop(columns=["prediction_id"]) - ) + input_dataframe["prediction_id"] = input_dataframe["prediction_id"].astype(str) + dataset.dataframe.equals(input_dataframe) def test_dataset_normalization_columns_add_missing_prediction_id(self): input_dataframe = DataFrame( @@ -620,7 +618,7 @@ def test_dataset_normalization_columns_add_missing_timestamp(self): { "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), - "prediction_id": self.random_uuids(), + "prediction_id": self._random_uuids(), } ) @@ -639,7 +637,7 @@ def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self) input_dataframe = DataFrame( { "prediction_label": [f"label{index}" for index in range(self.num_records)], - "feature0": np.zeros(self._NUM_RECORDS), + "feature0": np.zeros(self.num_records), } ) @@ -650,10 +648,50 @@ def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) dataset.dataframe.equals(input_dataframe) + assert "prediction_id" in dataset.dataframe + assert dataset.dataframe.dtypes["prediction_id"], "string" assert "timestamp" in dataset.dataframe assert dataset.dataframe.dtypes["timestamp"], "datetime[nz]" - def random_uuids(self): + def test_dataset_validate_invalid_prediction_id_datatype(self) -> None: + input_df = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "prediction_id": np.full( + shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + } + ) + + input_schema = Schema( + prediction_id_column_name="prediction_id", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) + + def test_dataset_validate_invalid_timestamp_datatype(self) -> None: + input_df = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "timestamp": self._random_uuids(), + }, + ) + + input_schema = Schema( + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) + + def _random_uuids(self): return [str(uuid.uuid4()) for _ in range(self.num_records)] @property @@ -663,27 +701,3 @@ def num_records(self): @property def embedding_dimension(self): return self._EMBEDDING_DIMENSION - - # @pytest.mark.parametrize( - # "input_df, input_schema", - # [ - # ( - # { - # "prediction_id": np.full( - # shape=num_samples, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp - # ), - # }, - # {"prediction_id_column_name": "prediction_id"}, - # ), - # ( - # { - # "timestamp": random_uuids(), - # }, - # {"timestamp_column_name": "timestamp"}, - # ), - # ], - # indirect=True, - # ) - # def test_dataset_validation(input_df, input_schema) -> None: - # with pytest.raises(DatasetError): - # Dataset(dataframe=input_df, schema=input_schema) From 9b66cb659807bbe6edcdce1e891328e76161e6f5 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Fri, 20 Jan 2023 10:21:36 -0800 Subject: [PATCH 17/27] type fixes --- src/phoenix/datasets/dataset.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index b321bb48ae..e30d74a51f 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -419,23 +419,23 @@ def _create_parsed_dataframe_and_schema( parsed_dataframe = dataframe[included_column_names].copy() parsed_schema = replace(schema, excludes=None, **schema_patch) - if parsed_schema.timestamp_column_name is None: + ts_col_name = parsed_schema.timestamp_column_name + if ts_col_name is None: now = Timestamp.utcnow() parsed_schema = dataclasses.replace(parsed_schema, timestamp_column_name="timestamp") parsed_dataframe["timestamp"] = now - elif is_numeric_dtype(dataframe.dtypes[schema.timestamp_column_name]): - parsed_dataframe[schema.timestamp_column_name] = parsed_dataframe[ - schema.timestamp_column_name - ].apply(lambda x: to_datetime(x, unit="ms")) + elif is_numeric_dtype(dataframe.dtypes[ts_col_name]): + parsed_dataframe[ts_col_name] = parsed_dataframe[ts_col_name].apply( + lambda x: to_datetime(x, unit="ms") + ) - if parsed_schema.prediction_id_column_name is None: + pred_col_name = parsed_schema.prediction_id_column_name + if pred_col_name is None: parsed_schema = dataclasses.replace( parsed_schema, prediction_id_column_name="prediction_id" ) parsed_dataframe["prediction_id"] = parsed_dataframe.apply(lambda _: str(uuid.uuid4())) - elif is_numeric_dtype(parsed_dataframe.dtypes[schema.prediction_id_column_name]): - parsed_dataframe[schema.prediction_id_column_name] = parsed_dataframe[ - schema.prediction_id_column_name - ].astype(str) + elif is_numeric_dtype(parsed_dataframe.dtypes[pred_col_name]): + parsed_dataframe[pred_col_name] = parsed_dataframe[pred_col_name].astype(str) return parsed_dataframe, parsed_schema From 47ae0d60bed98c915655c2152fe090f74bc3204b Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Fri, 20 Jan 2023 10:25:48 -0800 Subject: [PATCH 18/27] Update validation.py --- src/phoenix/datasets/validation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/phoenix/datasets/validation.py b/src/phoenix/datasets/validation.py index 6314f0bf1a..1de16af974 100644 --- a/src/phoenix/datasets/validation.py +++ b/src/phoenix/datasets/validation.py @@ -17,7 +17,7 @@ def validate_dataset_inputs(dataframe: DataFrame, schema: Schema) -> List[err.Va def check_column_types(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: - wrong_type_cols: str = [] + wrong_type_cols: List[str] = [] if schema.timestamp_column_name is not None: if not ( is_numeric_dtype(dataframe.dtypes[schema.timestamp_column_name]) From ce7008c5f389f5442d8b36fbd55dc0b11be061d6 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Mon, 23 Jan 2023 09:56:28 -0800 Subject: [PATCH 19/27] updates --- src/phoenix/datasets/dataset.py | 8 +- src/phoenix/datasets/errors.py | 12 +++ src/phoenix/datasets/validation.py | 28 +++++- tests/datasets/test_dataset.py | 154 ++++++++++++++++++++++------- 4 files changed, 157 insertions(+), 45 deletions(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index e30d74a51f..a1841218cb 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -270,7 +270,7 @@ def _parse_dataframe_and_schema(dataframe: DataFrame, schema: Schema) -> Tuple[D "not found in the dataframe: {}".format(", ".join(unseen_excluded_column_names)) ) - parsed_dataframe, parsed_schema = _create_parsed_dataframe_and_schema( + parsed_dataframe, parsed_schema = _create_and_normalize_dataframe_and_schema( dataframe, schema, schema_patch, column_name_to_include ) @@ -402,7 +402,7 @@ def _discover_feature_columns( ) -def _create_parsed_dataframe_and_schema( +def _create_and_normalize_dataframe_and_schema( dataframe: DataFrame, schema: Schema, schema_patch: Dict[SchemaFieldName, SchemaFieldValue], @@ -422,7 +422,7 @@ def _create_parsed_dataframe_and_schema( ts_col_name = parsed_schema.timestamp_column_name if ts_col_name is None: now = Timestamp.utcnow() - parsed_schema = dataclasses.replace(parsed_schema, timestamp_column_name="timestamp") + parsed_schema = replace(parsed_schema, timestamp_column_name="timestamp") parsed_dataframe["timestamp"] = now elif is_numeric_dtype(dataframe.dtypes[ts_col_name]): parsed_dataframe[ts_col_name] = parsed_dataframe[ts_col_name].apply( @@ -431,7 +431,7 @@ def _create_parsed_dataframe_and_schema( pred_col_name = parsed_schema.prediction_id_column_name if pred_col_name is None: - parsed_schema = dataclasses.replace( + parsed_schema = replace( parsed_schema, prediction_id_column_name="prediction_id" ) parsed_dataframe["prediction_id"] = parsed_dataframe.apply(lambda _: str(uuid.uuid4())) diff --git a/src/phoenix/datasets/errors.py b/src/phoenix/datasets/errors.py index 1c37d20390..94f82e183a 100644 --- a/src/phoenix/datasets/errors.py +++ b/src/phoenix/datasets/errors.py @@ -41,6 +41,18 @@ def error_message(self) -> str: f"{', '.join(map(str, self.missing_cols))}." ) +class InvalidSchemaError(ValidationError): + def __repr__(self) -> str: + return self.__class__.__name__ + + def __init__(self, invalid_props: Iterable[str]) -> None: + self.invalid_props = invalid_props + + def error_message(self) -> str: + return ( + "The schema is invalid: " + f"{', '.join(map(str, self.invalid_props))}." + ) class DatasetError(Exception): """An error raised when the dataset is invalid or incomplete""" diff --git a/src/phoenix/datasets/validation.py b/src/phoenix/datasets/validation.py index 1de16af974..03abad8f52 100644 --- a/src/phoenix/datasets/validation.py +++ b/src/phoenix/datasets/validation.py @@ -9,14 +9,36 @@ from .schema import Schema +def _check_valid_schema(schema: Schema) -> List[err.ValidationError]: + errs: List[str] = [] + if schema.excludes is None: + return [] + + if schema.timestamp_column_name in schema.excludes: + errs.append( + f"{schema.timestamp_column_name} cannot be excluded because " + f"it is already being used as the timestamp column") + + if schema.prediction_id_column_name in schema.excludes: + errs.append( + f"{schema.prediction_id_column_name} cannot be excluded because " + f"it is already being used as the prediction id column") + + if len(errs) > 0: + return [err.InvalidSchemaError(errs)] + + return [] + + def validate_dataset_inputs(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: general_checks = chain( - check_missing_columns(dataframe, schema), check_column_types(dataframe, schema) + _check_missing_columns(dataframe, schema), _check_column_types(dataframe, schema), + _check_valid_schema(schema), ) return list(general_checks) -def check_column_types(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: +def _check_column_types(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: wrong_type_cols: List[str] = [] if schema.timestamp_column_name is not None: if not ( @@ -41,7 +63,7 @@ def check_column_types(dataframe: DataFrame, schema: Schema) -> List[err.Validat return [] -def check_missing_columns(dataframe: DataFrame, schema: Schema) -> List[err.MissingColumns]: +def _check_missing_columns(dataframe: DataFrame, schema: Schema) -> List[err.MissingColumns]: # converting to a set first makes the checks run a lot faster existing_columns = set(dataframe.columns) missing_columns = [] diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 757d75db39..284090d20b 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -13,9 +13,10 @@ from phoenix.datasets.dataset import Dataset, EmbeddingColumnNames, Schema from phoenix.datasets.errors import DatasetError +from phoenix.datasets.dataset import _parse_dataframe_and_schema -class TestDataset: +class TestParseDataFrameAndSchema: """ Tests for `_parse_dataframe_and_schema` """ @@ -44,7 +45,7 @@ def test_schema_contains_all_dataframe_columns_results_in_unchanged_output(self, actual_label_column_name=None, actual_score_column_name=None, ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, @@ -70,7 +71,7 @@ def test_column_present_in_dataframe_but_missing_from_schema_is_dropped(self, ca feature_column_names=["feature0", "feature1"], prediction_label_column_name="prediction_label", ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ @@ -102,7 +103,7 @@ def test_some_features_excluded_removes_excluded_features_columns_and_keeps_the_ prediction_label_column_name="prediction_label", excludes=["feature1"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ @@ -141,7 +142,7 @@ def test_all_features_and_tags_excluded_sets_schema_features_and_tags_fields_to_ prediction_label_column_name="prediction_label", excludes=excludes, ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ @@ -175,7 +176,7 @@ def test_excluded_single_column_schema_fields_set_to_none(self, caplog): feature_column_names=["feature0", "feature1"], excludes=["prediction_label"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ @@ -206,7 +207,7 @@ def test_no_input_schema_features_and_no_excludes_discovers_features(self, caplo prediction_label_column_name="prediction_label", timestamp_column_name="timestamp", ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, @@ -240,7 +241,7 @@ def test_no_input_schema_features_and_list_of_excludes_discovers_non_excluded_fe prediction_label_column_name="prediction_label", excludes=excludes, ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ @@ -279,7 +280,7 @@ def test_excluded_column_not_contained_in_dataframe_logs_warning(self, caplog): prediction_label_column_name="prediction_label", excludes=excludes, ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ @@ -315,7 +316,7 @@ def test_schema_includes_embedding_feature_has_all_embedding_columns_included(se ), }, ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, @@ -356,7 +357,7 @@ def test_embedding_columns_of_excluded_embedding_feature_are_removed(self, caplo }, excludes=["embedding_feature0"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ @@ -407,7 +408,7 @@ def test_excluding_all_embedding_features_sets_schema_embedding_field_to_none(se }, excludes=["embedding_feature0"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp"]], @@ -446,7 +447,7 @@ def test_excluding_an_embedding_column_rather_than_the_embedding_feature_name_lo }, excludes=["embedding_vector0"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, @@ -483,7 +484,7 @@ def test_excluding_embedding_feature_with_same_name_as_embedding_column_does_not }, excludes=["embedding0"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp"]], @@ -496,7 +497,7 @@ def test_excluding_embedding_feature_with_same_name_as_embedding_column_does_not caplog=caplog, ) - def _run_function_and_check_output( + def _parse_dataframe_and_schema_and_check_output( self, input_dataframe: DataFrame, input_schema: Schema, @@ -505,9 +506,7 @@ def _run_function_and_check_output( should_log_warning_to_user: bool, caplog: LogCaptureFixture, ) -> None: - dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - parsed_dataframe = dataset.dataframe - parsed_schema = dataset.schema + parsed_dataframe, parsed_schema = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) assert parsed_dataframe.equals(expected_parsed_dataframe) assert parsed_schema == expected_parsed_schema @@ -542,8 +541,9 @@ def test_dataset_normalization_columns_already_normalized(self): prediction_label_column_name="prediction_label", ) - dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - dataset.dataframe.equals(input_dataframe) + expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + + assert expected_dataframe.equals(input_dataframe) def test_dataset_normalization_timestamp_integer_to_datetime(self): input_dataframe = DataFrame( @@ -563,11 +563,12 @@ def test_dataset_normalization_timestamp_integer_to_datetime(self): prediction_label_column_name="prediction_label", ) - dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + input_dataframe["timestamp"] = input_dataframe["timestamp"].apply( lambda x: to_datetime(x, unit="ms") ) - dataset.dataframe.equals(input_dataframe) + assert expected_dataframe.equals(input_dataframe) def test_dataset_normalization_prediction_id_integer_to_string(self): input_dataframe = DataFrame( @@ -587,9 +588,10 @@ def test_dataset_normalization_prediction_id_integer_to_string(self): prediction_label_column_name="prediction_label", ) - dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + input_dataframe["prediction_id"] = input_dataframe["prediction_id"].astype(str) - dataset.dataframe.equals(input_dataframe) + assert expected_dataframe.equals(input_dataframe) def test_dataset_normalization_columns_add_missing_prediction_id(self): input_dataframe = DataFrame( @@ -608,10 +610,12 @@ def test_dataset_normalization_columns_add_missing_prediction_id(self): prediction_label_column_name="prediction_label", ) - dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - dataset.dataframe.equals(input_dataframe) - assert "prediction_id" in dataset.dataframe - assert dataset.dataframe.dtypes["prediction_id"], "string" + expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + + assert len(expected_dataframe.columns) == 4 + assert expected_dataframe[["prediction_label", "feature0", "timestamp"]].equals(input_dataframe) + assert "prediction_id" in expected_dataframe + assert expected_dataframe.dtypes["prediction_id"], "string" def test_dataset_normalization_columns_add_missing_timestamp(self): input_dataframe = DataFrame( @@ -628,10 +632,12 @@ def test_dataset_normalization_columns_add_missing_timestamp(self): prediction_label_column_name="prediction_label", ) - dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - dataset.dataframe.equals(input_dataframe) - assert "timestamp" in dataset.dataframe - assert dataset.dataframe.dtypes["timestamp"], "datetime[nz]" + expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + + assert len(expected_dataframe.columns) == 4 + assert expected_dataframe[["prediction_label", "feature0", "prediction_id"]].equals(input_dataframe) + assert "timestamp" in expected_dataframe + assert expected_dataframe.dtypes["timestamp"], "datetime[nz]" def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self): input_dataframe = DataFrame( @@ -646,12 +652,14 @@ def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self) prediction_label_column_name="prediction_label", ) - dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - dataset.dataframe.equals(input_dataframe) - assert "prediction_id" in dataset.dataframe - assert dataset.dataframe.dtypes["prediction_id"], "string" - assert "timestamp" in dataset.dataframe - assert dataset.dataframe.dtypes["timestamp"], "datetime[nz]" + expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + + assert len(expected_dataframe.columns) == 4 + assert expected_dataframe[["prediction_label", "feature0"]].equals(input_dataframe) + assert "prediction_id" in expected_dataframe + assert expected_dataframe.dtypes["prediction_id"], "string" + assert "timestamp" in expected_dataframe + assert expected_dataframe.dtypes["timestamp"], "datetime[nz]" def test_dataset_validate_invalid_prediction_id_datatype(self) -> None: input_df = DataFrame( @@ -691,6 +699,45 @@ def test_dataset_validate_invalid_timestamp_datatype(self) -> None: with raises(DatasetError): Dataset(dataframe=input_df, schema=input_schema) + def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: + input_df = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "timestamp": self._random_uuids(), + }, + ) + + input_schema = Schema( + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + excludes=["timestamp"] + ) + + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) + + def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: + input_df = DataFrame( + { + "prediction_id": [str(x) for x in range(self.num_records)], + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "timestamp": self._random_uuids(), + }, + ) + + input_schema = Schema( + prediction_id_column_name="prediction_id", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + excludes=["prediction_id"] + ) + + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) + def _random_uuids(self): return [str(uuid.uuid4()) for _ in range(self.num_records)] @@ -701,3 +748,34 @@ def num_records(self): @property def embedding_dimension(self): return self._EMBEDDING_DIMENSION + + +class TestDataset: + + _NUM_RECORDS = 9 + + def test_dataset_normalization_columns_already_normalized(self): + input_dataframe = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self._NUM_RECORDS)], + "feature0": np.zeros(self._NUM_RECORDS), + "timestamp": np.full( + shape=self._NUM_RECORDS, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + "prediction_id": [str(uuid.uuid4()) for _ in range(self._NUM_RECORDS)], + } + ) + + input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + expected_dataframe = dataset.dataframe + expected_schema = dataset.schema + + assert expected_dataframe.equals(input_dataframe) + assert expected_schema == input_schema From ea98970650af9d1f28ce321b464080954496c63f Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Mon, 23 Jan 2023 10:02:36 -0800 Subject: [PATCH 20/27] Update dataset.py --- src/phoenix/datasets/dataset.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index a1841218cb..8c04521133 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -410,7 +410,9 @@ def _create_and_normalize_dataframe_and_schema( ) -> Tuple[DataFrame, Schema]: """ Creates new dataframe and schema objects to reflect excluded column names - and discovered features. + and discovered features. This also normalizes dataframe columns to ensure a + standard set of columns (i.e. timestamp and prediction_id) and datatypes for + those columns. """ included_column_names: List[str] = [] for column_name in dataframe.columns: From 162873e0605657080157173816d7c16ec6a5108e Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Mon, 23 Jan 2023 10:13:21 -0800 Subject: [PATCH 21/27] formatting --- src/phoenix/datasets/dataset.py | 5 +-- src/phoenix/datasets/errors.py | 7 ++--- src/phoenix/datasets/validation.py | 9 ++++-- tests/datasets/test_dataset.py | 50 +++++++++++++++++++++--------- 4 files changed, 46 insertions(+), 25 deletions(-) diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index 8c04521133..ac6649546a 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -1,4 +1,3 @@ -import dataclasses import logging import os import sys @@ -433,9 +432,7 @@ def _create_and_normalize_dataframe_and_schema( pred_col_name = parsed_schema.prediction_id_column_name if pred_col_name is None: - parsed_schema = replace( - parsed_schema, prediction_id_column_name="prediction_id" - ) + parsed_schema = replace(parsed_schema, prediction_id_column_name="prediction_id") parsed_dataframe["prediction_id"] = parsed_dataframe.apply(lambda _: str(uuid.uuid4())) elif is_numeric_dtype(parsed_dataframe.dtypes[pred_col_name]): parsed_dataframe[pred_col_name] = parsed_dataframe[pred_col_name].astype(str) diff --git a/src/phoenix/datasets/errors.py b/src/phoenix/datasets/errors.py index 94f82e183a..3a90c40d59 100644 --- a/src/phoenix/datasets/errors.py +++ b/src/phoenix/datasets/errors.py @@ -41,6 +41,7 @@ def error_message(self) -> str: f"{', '.join(map(str, self.missing_cols))}." ) + class InvalidSchemaError(ValidationError): def __repr__(self) -> str: return self.__class__.__name__ @@ -49,10 +50,8 @@ def __init__(self, invalid_props: Iterable[str]) -> None: self.invalid_props = invalid_props def error_message(self) -> str: - return ( - "The schema is invalid: " - f"{', '.join(map(str, self.invalid_props))}." - ) + return "The schema is invalid: " f"{', '.join(map(str, self.invalid_props))}." + class DatasetError(Exception): """An error raised when the dataset is invalid or incomplete""" diff --git a/src/phoenix/datasets/validation.py b/src/phoenix/datasets/validation.py index 03abad8f52..d5fc5f9d22 100644 --- a/src/phoenix/datasets/validation.py +++ b/src/phoenix/datasets/validation.py @@ -17,12 +17,14 @@ def _check_valid_schema(schema: Schema) -> List[err.ValidationError]: if schema.timestamp_column_name in schema.excludes: errs.append( f"{schema.timestamp_column_name} cannot be excluded because " - f"it is already being used as the timestamp column") + f"it is already being used as the timestamp column" + ) if schema.prediction_id_column_name in schema.excludes: errs.append( f"{schema.prediction_id_column_name} cannot be excluded because " - f"it is already being used as the prediction id column") + f"it is already being used as the prediction id column" + ) if len(errs) > 0: return [err.InvalidSchemaError(errs)] @@ -32,7 +34,8 @@ def _check_valid_schema(schema: Schema) -> List[err.ValidationError]: def validate_dataset_inputs(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: general_checks = chain( - _check_missing_columns(dataframe, schema), _check_column_types(dataframe, schema), + _check_missing_columns(dataframe, schema), + _check_column_types(dataframe, schema), _check_valid_schema(schema), ) return list(general_checks) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 284090d20b..e432c37a59 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -11,9 +11,13 @@ from pandas import DataFrame, to_datetime from pytest import LogCaptureFixture, raises -from phoenix.datasets.dataset import Dataset, EmbeddingColumnNames, Schema +from phoenix.datasets.dataset import ( + Dataset, + EmbeddingColumnNames, + Schema, + _parse_dataframe_and_schema, +) from phoenix.datasets.errors import DatasetError -from phoenix.datasets.dataset import _parse_dataframe_and_schema class TestParseDataFrameAndSchema: @@ -506,7 +510,9 @@ def _parse_dataframe_and_schema_and_check_output( should_log_warning_to_user: bool, caplog: LogCaptureFixture, ) -> None: - parsed_dataframe, parsed_schema = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + parsed_dataframe, parsed_schema = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) assert parsed_dataframe.equals(expected_parsed_dataframe) assert parsed_schema == expected_parsed_schema @@ -541,7 +547,9 @@ def test_dataset_normalization_columns_already_normalized(self): prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + expected_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) assert expected_dataframe.equals(input_dataframe) @@ -563,7 +571,9 @@ def test_dataset_normalization_timestamp_integer_to_datetime(self): prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + expected_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) input_dataframe["timestamp"] = input_dataframe["timestamp"].apply( lambda x: to_datetime(x, unit="ms") @@ -588,7 +598,9 @@ def test_dataset_normalization_prediction_id_integer_to_string(self): prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + expected_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) input_dataframe["prediction_id"] = input_dataframe["prediction_id"].astype(str) assert expected_dataframe.equals(input_dataframe) @@ -610,10 +622,14 @@ def test_dataset_normalization_columns_add_missing_prediction_id(self): prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + expected_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) assert len(expected_dataframe.columns) == 4 - assert expected_dataframe[["prediction_label", "feature0", "timestamp"]].equals(input_dataframe) + assert expected_dataframe[["prediction_label", "feature0", "timestamp"]].equals( + input_dataframe + ) assert "prediction_id" in expected_dataframe assert expected_dataframe.dtypes["prediction_id"], "string" @@ -632,10 +648,14 @@ def test_dataset_normalization_columns_add_missing_timestamp(self): prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + expected_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) assert len(expected_dataframe.columns) == 4 - assert expected_dataframe[["prediction_label", "feature0", "prediction_id"]].equals(input_dataframe) + assert expected_dataframe[["prediction_label", "feature0", "prediction_id"]].equals( + input_dataframe + ) assert "timestamp" in expected_dataframe assert expected_dataframe.dtypes["timestamp"], "datetime[nz]" @@ -652,7 +672,9 @@ def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self) prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema(dataframe=input_dataframe, schema=input_schema) + expected_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) assert len(expected_dataframe.columns) == 4 assert expected_dataframe[["prediction_label", "feature0"]].equals(input_dataframe) @@ -712,13 +734,13 @@ def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: timestamp_column_name="timestamp", feature_column_names=["feature0"], prediction_label_column_name="prediction_label", - excludes=["timestamp"] + excludes=["timestamp"], ) with raises(DatasetError): Dataset(dataframe=input_df, schema=input_schema) - def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: + def test_dataset_validate_invalid_schema_excludes_prediction_id(self) -> None: input_df = DataFrame( { "prediction_id": [str(x) for x in range(self.num_records)], @@ -732,7 +754,7 @@ def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: prediction_id_column_name="prediction_id", feature_column_names=["feature0"], prediction_label_column_name="prediction_label", - excludes=["prediction_id"] + excludes=["prediction_id"], ) with raises(DatasetError): From 6bff7ef115506ae8688f3efbde31646f5bd0b061 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Mon, 23 Jan 2023 16:20:48 -0800 Subject: [PATCH 22/27] update error type --- src/phoenix/datasets/errors.py | 4 ++-- src/phoenix/datasets/validation.py | 1 + tests/datasets/test_dataset.py | 10 +++++----- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/phoenix/datasets/errors.py b/src/phoenix/datasets/errors.py index 3a90c40d59..f6b108cbfc 100644 --- a/src/phoenix/datasets/errors.py +++ b/src/phoenix/datasets/errors.py @@ -1,8 +1,8 @@ -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import Iterable, List, Union -class ValidationError(ABC): +class ValidationError(Exception): def __repr__(self) -> str: return self.__class__.__name__ diff --git a/src/phoenix/datasets/validation.py b/src/phoenix/datasets/validation.py index d5fc5f9d22..ed9a2adf41 100644 --- a/src/phoenix/datasets/validation.py +++ b/src/phoenix/datasets/validation.py @@ -99,4 +99,5 @@ def _check_missing_columns(dataframe: DataFrame, schema: Schema) -> List[err.Mis if missing_columns: return [err.MissingColumns(missing_columns)] + return [] diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index e432c37a59..e2588afc9c 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -17,7 +17,7 @@ Schema, _parse_dataframe_and_schema, ) -from phoenix.datasets.errors import DatasetError +from phoenix.datasets.errors import InvalidSchemaError, InvalidColumnType class TestParseDataFrameAndSchema: @@ -700,7 +700,7 @@ def test_dataset_validate_invalid_prediction_id_datatype(self) -> None: prediction_label_column_name="prediction_label", ) - with raises(DatasetError): + with raises(InvalidColumnType): Dataset(dataframe=input_df, schema=input_schema) def test_dataset_validate_invalid_timestamp_datatype(self) -> None: @@ -718,7 +718,7 @@ def test_dataset_validate_invalid_timestamp_datatype(self) -> None: prediction_label_column_name="prediction_label", ) - with raises(DatasetError): + with raises(InvalidColumnType): Dataset(dataframe=input_df, schema=input_schema) def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: @@ -737,7 +737,7 @@ def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: excludes=["timestamp"], ) - with raises(DatasetError): + with raises(InvalidSchemaError): Dataset(dataframe=input_df, schema=input_schema) def test_dataset_validate_invalid_schema_excludes_prediction_id(self) -> None: @@ -757,7 +757,7 @@ def test_dataset_validate_invalid_schema_excludes_prediction_id(self) -> None: excludes=["prediction_id"], ) - with raises(DatasetError): + with raises(InvalidSchemaError): Dataset(dataframe=input_df, schema=input_schema) def _random_uuids(self): From 653a1be01cc44af3bea623deee91ed046d7d5835 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Mon, 23 Jan 2023 16:21:42 -0800 Subject: [PATCH 23/27] reformatting --- src/phoenix/datasets/validation.py | 2 +- tests/datasets/test_dataset.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/phoenix/datasets/validation.py b/src/phoenix/datasets/validation.py index ed9a2adf41..d07f841a42 100644 --- a/src/phoenix/datasets/validation.py +++ b/src/phoenix/datasets/validation.py @@ -99,5 +99,5 @@ def _check_missing_columns(dataframe: DataFrame, schema: Schema) -> List[err.Mis if missing_columns: return [err.MissingColumns(missing_columns)] - + return [] diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index e2588afc9c..11c198ba2f 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -17,7 +17,7 @@ Schema, _parse_dataframe_and_schema, ) -from phoenix.datasets.errors import InvalidSchemaError, InvalidColumnType +from phoenix.datasets.errors import InvalidColumnType, InvalidSchemaError class TestParseDataFrameAndSchema: From 09b47899b10aba1cbe89da8be4812f7cb5cd7332 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Mon, 23 Jan 2023 16:35:10 -0800 Subject: [PATCH 24/27] Update test_dataset.py --- tests/datasets/test_dataset.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 11c198ba2f..3303f2e696 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -701,7 +701,7 @@ def test_dataset_validate_invalid_prediction_id_datatype(self) -> None: ) with raises(InvalidColumnType): - Dataset(dataframe=input_df, schema=input_schema) + _parse_dataframe_and_schema(dataframe=input_df, schema=input_schema) def test_dataset_validate_invalid_timestamp_datatype(self) -> None: input_df = DataFrame( @@ -719,7 +719,7 @@ def test_dataset_validate_invalid_timestamp_datatype(self) -> None: ) with raises(InvalidColumnType): - Dataset(dataframe=input_df, schema=input_schema) + _parse_dataframe_and_schema(dataframe=input_df, schema=input_schema) def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: input_df = DataFrame( @@ -738,7 +738,7 @@ def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: ) with raises(InvalidSchemaError): - Dataset(dataframe=input_df, schema=input_schema) + _parse_dataframe_and_schema(dataframe=input_df, schema=input_schema) def test_dataset_validate_invalid_schema_excludes_prediction_id(self) -> None: input_df = DataFrame( @@ -758,7 +758,7 @@ def test_dataset_validate_invalid_schema_excludes_prediction_id(self) -> None: ) with raises(InvalidSchemaError): - Dataset(dataframe=input_df, schema=input_schema) + _parse_dataframe_and_schema(dataframe=input_df, schema=input_schema) def _random_uuids(self): return [str(uuid.uuid4()) for _ in range(self.num_records)] From a6d3d8f00e983ee8f631c4e75a5ba3d563104ee7 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Mon, 23 Jan 2023 17:06:56 -0800 Subject: [PATCH 25/27] minor changes to address feedback --- src/phoenix/datasets/errors.py | 3 +- tests/datasets/test_dataset.py | 60 ++++++++++++++++++---------------- 2 files changed, 33 insertions(+), 30 deletions(-) diff --git a/src/phoenix/datasets/errors.py b/src/phoenix/datasets/errors.py index f6b108cbfc..1c9e86ef6a 100644 --- a/src/phoenix/datasets/errors.py +++ b/src/phoenix/datasets/errors.py @@ -50,7 +50,8 @@ def __init__(self, invalid_props: Iterable[str]) -> None: self.invalid_props = invalid_props def error_message(self) -> str: - return "The schema is invalid: " f"{', '.join(map(str, self.invalid_props))}." + errors_string = ', '.join(map(str, self.invalid_props)) + return f"The schema is invalid: {errors_string}." class DatasetError(Exception): diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index 3303f2e696..dfd9bda767 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -547,11 +547,11 @@ def test_dataset_normalization_columns_already_normalized(self): prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema( + output_dataframe, _ = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema ) - assert expected_dataframe.equals(input_dataframe) + assert output_dataframe.equals(input_dataframe) def test_dataset_normalization_timestamp_integer_to_datetime(self): input_dataframe = DataFrame( @@ -571,14 +571,15 @@ def test_dataset_normalization_timestamp_integer_to_datetime(self): prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema( + output_dataframe, _ = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema ) - input_dataframe["timestamp"] = input_dataframe["timestamp"].apply( + expected_dataframe = input_dataframe + expected_dataframe["timestamp"] = expected_dataframe["timestamp"].apply( lambda x: to_datetime(x, unit="ms") ) - assert expected_dataframe.equals(input_dataframe) + assert output_dataframe.equals(expected_dataframe) def test_dataset_normalization_prediction_id_integer_to_string(self): input_dataframe = DataFrame( @@ -598,12 +599,13 @@ def test_dataset_normalization_prediction_id_integer_to_string(self): prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema( + output_dataframe, _ = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema ) - input_dataframe["prediction_id"] = input_dataframe["prediction_id"].astype(str) - assert expected_dataframe.equals(input_dataframe) + expected_dataframe = input_dataframe + expected_dataframe["prediction_id"] = expected_dataframe["prediction_id"].astype(str) + assert output_dataframe.equals(expected_dataframe) def test_dataset_normalization_columns_add_missing_prediction_id(self): input_dataframe = DataFrame( @@ -622,16 +624,16 @@ def test_dataset_normalization_columns_add_missing_prediction_id(self): prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema( + output_dataframe, _ = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema ) - assert len(expected_dataframe.columns) == 4 - assert expected_dataframe[["prediction_label", "feature0", "timestamp"]].equals( + assert len(output_dataframe.columns) == 4 + assert output_dataframe[["prediction_label", "feature0", "timestamp"]].equals( input_dataframe ) - assert "prediction_id" in expected_dataframe - assert expected_dataframe.dtypes["prediction_id"], "string" + assert "prediction_id" in output_dataframe + assert output_dataframe.dtypes["prediction_id"], "string" def test_dataset_normalization_columns_add_missing_timestamp(self): input_dataframe = DataFrame( @@ -648,16 +650,16 @@ def test_dataset_normalization_columns_add_missing_timestamp(self): prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema( + output_dataframe, _ = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema ) - assert len(expected_dataframe.columns) == 4 - assert expected_dataframe[["prediction_label", "feature0", "prediction_id"]].equals( + assert len(output_dataframe.columns) == 4 + assert output_dataframe[["prediction_label", "feature0", "prediction_id"]].equals( input_dataframe ) - assert "timestamp" in expected_dataframe - assert expected_dataframe.dtypes["timestamp"], "datetime[nz]" + assert "timestamp" in output_dataframe + assert output_dataframe.dtypes["timestamp"], "datetime[nz]" def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self): input_dataframe = DataFrame( @@ -672,16 +674,16 @@ def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self) prediction_label_column_name="prediction_label", ) - expected_dataframe, _ = _parse_dataframe_and_schema( + output_dataframe, _ = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema ) - assert len(expected_dataframe.columns) == 4 - assert expected_dataframe[["prediction_label", "feature0"]].equals(input_dataframe) - assert "prediction_id" in expected_dataframe - assert expected_dataframe.dtypes["prediction_id"], "string" - assert "timestamp" in expected_dataframe - assert expected_dataframe.dtypes["timestamp"], "datetime[nz]" + assert len(output_dataframe.columns) == 4 + assert output_dataframe[["prediction_label", "feature0"]].equals(input_dataframe) + assert "prediction_id" in output_dataframe + assert output_dataframe.dtypes["prediction_id"], "string" + assert "timestamp" in output_dataframe + assert output_dataframe.dtypes["timestamp"], "datetime[nz]" def test_dataset_validate_invalid_prediction_id_datatype(self) -> None: input_df = DataFrame( @@ -796,8 +798,8 @@ def test_dataset_normalization_columns_already_normalized(self): ) dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - expected_dataframe = dataset.dataframe - expected_schema = dataset.schema + output_dataframe = dataset.dataframe + output_schema = dataset.schema - assert expected_dataframe.equals(input_dataframe) - assert expected_schema == input_schema + assert output_dataframe.equals(input_dataframe) + assert output_schema == input_schema From 5f72041a6ed72831310b5bae45370f58dc2dae34 Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Mon, 23 Jan 2023 17:07:36 -0800 Subject: [PATCH 26/27] fix formatting --- src/phoenix/datasets/errors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/phoenix/datasets/errors.py b/src/phoenix/datasets/errors.py index 1c9e86ef6a..ba1792e578 100644 --- a/src/phoenix/datasets/errors.py +++ b/src/phoenix/datasets/errors.py @@ -50,7 +50,7 @@ def __init__(self, invalid_props: Iterable[str]) -> None: self.invalid_props = invalid_props def error_message(self) -> str: - errors_string = ', '.join(map(str, self.invalid_props)) + errors_string = ", ".join(map(str, self.invalid_props)) return f"The schema is invalid: {errors_string}." From cdebaecd5467dc89fc1eac77f955202c87b9d65c Mon Sep 17 00:00:00 2001 From: nate-mar <67926244+nate-mar@users.noreply.github.com> Date: Mon, 23 Jan 2023 17:25:26 -0800 Subject: [PATCH 27/27] Update test_dataset.py --- tests/datasets/test_dataset.py | 107 +++++++++++++++++---------------- 1 file changed, 56 insertions(+), 51 deletions(-) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index dfd9bda767..066c4bc360 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -17,7 +17,7 @@ Schema, _parse_dataframe_and_schema, ) -from phoenix.datasets.errors import InvalidColumnType, InvalidSchemaError +from phoenix.datasets.errors import DatasetError class TestParseDataFrameAndSchema: @@ -536,7 +536,7 @@ def test_dataset_normalization_columns_already_normalized(self): "timestamp": np.full( shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp ), - "prediction_id": self._random_uuids(), + "prediction_id": random_uuids(self.num_records), } ) @@ -561,7 +561,7 @@ def test_dataset_normalization_timestamp_integer_to_datetime(self): "timestamp": np.full( shape=self.num_records, fill_value=pd.Timestamp.utcnow().timestamp(), dtype=int ), - "prediction_id": self._random_uuids(), + "prediction_id": random_uuids(self.num_records), } ) input_schema = Schema( @@ -640,7 +640,7 @@ def test_dataset_normalization_columns_add_missing_timestamp(self): { "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), - "prediction_id": self._random_uuids(), + "prediction_id": random_uuids(self.num_records), } ) @@ -685,6 +685,45 @@ def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self) assert "timestamp" in output_dataframe assert output_dataframe.dtypes["timestamp"], "datetime[nz]" + @property + def num_records(self): + return self._NUM_RECORDS + + @property + def embedding_dimension(self): + return self._EMBEDDING_DIMENSION + + +class TestDataset: + _NUM_RECORDS = 9 + + def test_dataset_normalization_columns_already_normalized(self): + input_dataframe = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self._NUM_RECORDS), + "timestamp": np.full( + shape=self._NUM_RECORDS, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + "prediction_id": random_uuids(self.num_records), + } + ) + + input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + output_dataframe = dataset.dataframe + output_schema = dataset.schema + + assert output_dataframe.equals(input_dataframe) + assert output_schema == input_schema + + # TODO: Move validation tests to validation module; keep one validation integration test def test_dataset_validate_invalid_prediction_id_datatype(self) -> None: input_df = DataFrame( { @@ -702,15 +741,15 @@ def test_dataset_validate_invalid_prediction_id_datatype(self) -> None: prediction_label_column_name="prediction_label", ) - with raises(InvalidColumnType): - _parse_dataframe_and_schema(dataframe=input_df, schema=input_schema) + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) def test_dataset_validate_invalid_timestamp_datatype(self) -> None: input_df = DataFrame( { "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), - "timestamp": self._random_uuids(), + "timestamp": random_uuids(self.num_records), }, ) @@ -720,15 +759,15 @@ def test_dataset_validate_invalid_timestamp_datatype(self) -> None: prediction_label_column_name="prediction_label", ) - with raises(InvalidColumnType): - _parse_dataframe_and_schema(dataframe=input_df, schema=input_schema) + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: input_df = DataFrame( { "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), - "timestamp": self._random_uuids(), + "timestamp": random_uuids(self.num_records), }, ) @@ -739,8 +778,8 @@ def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: excludes=["timestamp"], ) - with raises(InvalidSchemaError): - _parse_dataframe_and_schema(dataframe=input_df, schema=input_schema) + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) def test_dataset_validate_invalid_schema_excludes_prediction_id(self) -> None: input_df = DataFrame( @@ -748,7 +787,7 @@ def test_dataset_validate_invalid_schema_excludes_prediction_id(self) -> None: "prediction_id": [str(x) for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), - "timestamp": self._random_uuids(), + "timestamp": random_uuids(self.num_records), }, ) @@ -759,47 +798,13 @@ def test_dataset_validate_invalid_schema_excludes_prediction_id(self) -> None: excludes=["prediction_id"], ) - with raises(InvalidSchemaError): - _parse_dataframe_and_schema(dataframe=input_df, schema=input_schema) - - def _random_uuids(self): - return [str(uuid.uuid4()) for _ in range(self.num_records)] + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) @property def num_records(self): return self._NUM_RECORDS - @property - def embedding_dimension(self): - return self._EMBEDDING_DIMENSION - - -class TestDataset: - - _NUM_RECORDS = 9 - - def test_dataset_normalization_columns_already_normalized(self): - input_dataframe = DataFrame( - { - "prediction_label": [f"label{index}" for index in range(self._NUM_RECORDS)], - "feature0": np.zeros(self._NUM_RECORDS), - "timestamp": np.full( - shape=self._NUM_RECORDS, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp - ), - "prediction_id": [str(uuid.uuid4()) for _ in range(self._NUM_RECORDS)], - } - ) - - input_schema = Schema( - prediction_id_column_name="prediction_id", - timestamp_column_name="timestamp", - feature_column_names=["feature0"], - prediction_label_column_name="prediction_label", - ) - - dataset = Dataset(dataframe=input_dataframe, schema=input_schema) - output_dataframe = dataset.dataframe - output_schema = dataset.schema - assert output_dataframe.equals(input_dataframe) - assert output_schema == input_schema +def random_uuids(num_records: int): + return [str(uuid.uuid4()) for _ in range(num_records)]