diff --git a/src/phoenix/datasets/dataset.py b/src/phoenix/datasets/dataset.py index e6b0fd8244..ac6649546a 100644 --- a/src/phoenix/datasets/dataset.py +++ b/src/phoenix/datasets/dataset.py @@ -6,7 +6,8 @@ from dataclasses import fields, replace from typing import Any, Dict, List, Optional, Set, Tuple, Union -from pandas import DataFrame, Series, read_parquet +from pandas import DataFrame, Series, Timestamp, read_parquet, to_datetime +from pandas.api.types import is_numeric_dtype from phoenix.config import dataset_dir @@ -268,7 +269,7 @@ def _parse_dataframe_and_schema(dataframe: DataFrame, schema: Schema) -> Tuple[D "not found in the dataframe: {}".format(", ".join(unseen_excluded_column_names)) ) - parsed_dataframe, parsed_schema = _create_parsed_dataframe_and_schema( + parsed_dataframe, parsed_schema = _create_and_normalize_dataframe_and_schema( dataframe, schema, schema_patch, column_name_to_include ) @@ -400,7 +401,7 @@ def _discover_feature_columns( ) -def _create_parsed_dataframe_and_schema( +def _create_and_normalize_dataframe_and_schema( dataframe: DataFrame, schema: Schema, schema_patch: Dict[SchemaFieldName, SchemaFieldValue], @@ -408,12 +409,32 @@ def _create_parsed_dataframe_and_schema( ) -> Tuple[DataFrame, Schema]: """ Creates new dataframe and schema objects to reflect excluded column names - and discovered features. + and discovered features. This also normalizes dataframe columns to ensure a + standard set of columns (i.e. timestamp and prediction_id) and datatypes for + those columns. """ included_column_names: List[str] = [] for column_name in dataframe.columns: if column_name_to_include.get(str(column_name), False): included_column_names.append(str(column_name)) - parsed_dataframe = dataframe[included_column_names] + parsed_dataframe = dataframe[included_column_names].copy() parsed_schema = replace(schema, excludes=None, **schema_patch) + + ts_col_name = parsed_schema.timestamp_column_name + if ts_col_name is None: + now = Timestamp.utcnow() + parsed_schema = replace(parsed_schema, timestamp_column_name="timestamp") + parsed_dataframe["timestamp"] = now + elif is_numeric_dtype(dataframe.dtypes[ts_col_name]): + parsed_dataframe[ts_col_name] = parsed_dataframe[ts_col_name].apply( + lambda x: to_datetime(x, unit="ms") + ) + + pred_col_name = parsed_schema.prediction_id_column_name + if pred_col_name is None: + parsed_schema = replace(parsed_schema, prediction_id_column_name="prediction_id") + parsed_dataframe["prediction_id"] = parsed_dataframe.apply(lambda _: str(uuid.uuid4())) + elif is_numeric_dtype(parsed_dataframe.dtypes[pred_col_name]): + parsed_dataframe[pred_col_name] = parsed_dataframe[pred_col_name].astype(str) + return parsed_dataframe, parsed_schema diff --git a/src/phoenix/datasets/errors.py b/src/phoenix/datasets/errors.py index 13c5b17be8..ba1792e578 100644 --- a/src/phoenix/datasets/errors.py +++ b/src/phoenix/datasets/errors.py @@ -1,8 +1,8 @@ -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import Iterable, List, Union -class ValidationError(ABC): +class ValidationError(Exception): def __repr__(self) -> str: return self.__class__.__name__ @@ -42,6 +42,18 @@ def error_message(self) -> str: ) +class InvalidSchemaError(ValidationError): + def __repr__(self) -> str: + return self.__class__.__name__ + + def __init__(self, invalid_props: Iterable[str]) -> None: + self.invalid_props = invalid_props + + def error_message(self) -> str: + errors_string = ", ".join(map(str, self.invalid_props)) + return f"The schema is invalid: {errors_string}." + + class DatasetError(Exception): """An error raised when the dataset is invalid or incomplete""" @@ -49,6 +61,16 @@ def __init__(self, errors: Union[ValidationError, List[ValidationError]]): self.errors = errors +class InvalidColumnType(ValidationError): + """An error raised when the column type is invalid""" + + def __init__(self, error_msgs: Iterable[str]) -> None: + self.error_msgs = error_msgs + + def error_message(self) -> str: + return f"Invalid column types: {self.error_msgs}" + + class MissingField(ValidationError): """An error raised when trying to access a field that is absent from the Schema""" diff --git a/src/phoenix/datasets/schema.py b/src/phoenix/datasets/schema.py index 8d08079c15..2d5e8a090d 100644 --- a/src/phoenix/datasets/schema.py +++ b/src/phoenix/datasets/schema.py @@ -39,7 +39,7 @@ class Schema(Dict[SchemaFieldName, SchemaFieldValue]): def to_json(self) -> str: "Converts the schema to a dict for JSON serialization" - dictionary = self.__dict__ + dictionary = {} for field in self.__dataclass_fields__: value = getattr(self, field) diff --git a/src/phoenix/datasets/validation.py b/src/phoenix/datasets/validation.py index b8013d9405..d07f841a42 100644 --- a/src/phoenix/datasets/validation.py +++ b/src/phoenix/datasets/validation.py @@ -2,17 +2,71 @@ from typing import List from pandas import DataFrame +from pandas.api.types import is_datetime64_any_dtype as is_datetime +from pandas.api.types import is_numeric_dtype, is_string_dtype from . import errors as err from .schema import Schema +def _check_valid_schema(schema: Schema) -> List[err.ValidationError]: + errs: List[str] = [] + if schema.excludes is None: + return [] + + if schema.timestamp_column_name in schema.excludes: + errs.append( + f"{schema.timestamp_column_name} cannot be excluded because " + f"it is already being used as the timestamp column" + ) + + if schema.prediction_id_column_name in schema.excludes: + errs.append( + f"{schema.prediction_id_column_name} cannot be excluded because " + f"it is already being used as the prediction id column" + ) + + if len(errs) > 0: + return [err.InvalidSchemaError(errs)] + + return [] + + def validate_dataset_inputs(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: - general_checks = chain(check_missing_columns(dataframe, schema)) + general_checks = chain( + _check_missing_columns(dataframe, schema), + _check_column_types(dataframe, schema), + _check_valid_schema(schema), + ) return list(general_checks) -def check_missing_columns(dataframe: DataFrame, schema: Schema) -> List[err.MissingColumns]: +def _check_column_types(dataframe: DataFrame, schema: Schema) -> List[err.ValidationError]: + wrong_type_cols: List[str] = [] + if schema.timestamp_column_name is not None: + if not ( + is_numeric_dtype(dataframe.dtypes[schema.timestamp_column_name]) + or is_datetime(dataframe.dtypes[schema.timestamp_column_name]) + ): + wrong_type_cols.append( + f"{schema.timestamp_column_name} should be of timestamp or numeric type" + ) + + if schema.prediction_id_column_name is not None: + if not ( + is_numeric_dtype(dataframe.dtypes[schema.prediction_id_column_name]) + or is_string_dtype(dataframe.dtypes[schema.prediction_id_column_name]) + ): + wrong_type_cols.append( + f"{schema.prediction_id_column_name} should be a string or numeric type" + ) + + if len(wrong_type_cols) > 0: + return [err.InvalidColumnType(wrong_type_cols)] + return [] + + +def _check_missing_columns(dataframe: DataFrame, schema: Schema) -> List[err.MissingColumns]: # converting to a set first makes the checks run a lot faster existing_columns = set(dataframe.columns) missing_columns = [] @@ -45,4 +99,5 @@ def check_missing_columns(dataframe: DataFrame, schema: Schema) -> List[err.Miss if missing_columns: return [err.MissingColumns(missing_columns)] + return [] diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index d581980319..066c4bc360 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -1,12 +1,23 @@ +""" +Test dataset +""" + import logging +import uuid from dataclasses import replace import numpy as np -from pandas import DataFrame -from pytest import LogCaptureFixture +import pandas as pd +from pandas import DataFrame, to_datetime +from pytest import LogCaptureFixture, raises -from phoenix.datasets import EmbeddingColumnNames, Schema -from phoenix.datasets.dataset import _parse_dataframe_and_schema +from phoenix.datasets.dataset import ( + Dataset, + EmbeddingColumnNames, + Schema, + _parse_dataframe_and_schema, +) +from phoenix.datasets.errors import DatasetError class TestParseDataFrameAndSchema: @@ -20,8 +31,8 @@ class TestParseDataFrameAndSchema: def test_schema_contains_all_dataframe_columns_results_in_unchanged_output(self, caplog): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), - "ts": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -30,7 +41,7 @@ def test_schema_contains_all_dataframe_columns_results_in_unchanged_output(self, ) input_schema = Schema( prediction_id_column_name="prediction_id", - timestamp_column_name="ts", + timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], tag_column_names=["tag0"], prediction_label_column_name="prediction_label", @@ -38,7 +49,7 @@ def test_schema_contains_all_dataframe_columns_results_in_unchanged_output(self, actual_label_column_name=None, actual_score_column_name=None, ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, @@ -50,8 +61,8 @@ def test_schema_contains_all_dataframe_columns_results_in_unchanged_output(self, def test_column_present_in_dataframe_but_missing_from_schema_is_dropped(self, caplog): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), - "ts": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -60,17 +71,17 @@ def test_column_present_in_dataframe_but_missing_from_schema_is_dropped(self, ca ) input_schema = Schema( prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], - tag_column_names=["tag0"], prediction_label_column_name="prediction_label", ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ - [col for col in input_dataframe.columns if col != "ts"] + [col for col in input_dataframe.columns if col != "tag0"] ], - expected_parsed_schema=replace(input_schema, timestamp_column_name=None), + expected_parsed_schema=replace(input_schema, tag_column_names=None), should_log_warning_to_user=False, caplog=caplog, ) @@ -80,7 +91,8 @@ def test_some_features_excluded_removes_excluded_features_columns_and_keeps_the_ ): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -89,16 +101,17 @@ def test_some_features_excluded_removes_excluded_features_columns_and_keeps_the_ ) input_schema = Schema( prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], tag_column_names=["tag0"], prediction_label_column_name="prediction_label", excludes=["feature1"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ - ["prediction_id", "prediction_label", "feature0", "tag0"] + ["prediction_id", "timestamp", "prediction_label", "feature0", "tag0"] ], expected_parsed_schema=replace( input_schema, @@ -116,7 +129,8 @@ def test_all_features_and_tags_excluded_sets_schema_features_and_tags_fields_to_ ): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -126,15 +140,18 @@ def test_all_features_and_tags_excluded_sets_schema_features_and_tags_fields_to_ excludes = ["feature0", "feature1", "tag0"] input_schema = Schema( prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1"], tag_column_names=["tag0"], prediction_label_column_name="prediction_label", excludes=excludes, ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, - expected_parsed_dataframe=input_dataframe[["prediction_id", "prediction_label"]], + expected_parsed_dataframe=input_dataframe[ + ["prediction_id", "timestamp", "prediction_label"] + ], expected_parsed_schema=replace( input_schema, prediction_label_column_name="prediction_label", @@ -149,8 +166,8 @@ def test_all_features_and_tags_excluded_sets_schema_features_and_tags_fields_to_ def test_excluded_single_column_schema_fields_set_to_none(self, caplog): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), - "ts": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -158,19 +175,20 @@ def test_excluded_single_column_schema_fields_set_to_none(self, caplog): ) input_schema = Schema( prediction_id_column_name="prediction_id", - timestamp_column_name="ts", + timestamp_column_name="timestamp", prediction_label_column_name="prediction_label", feature_column_names=["feature0", "feature1"], - excludes=["prediction_label", "ts"], + excludes=["prediction_label"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, - expected_parsed_dataframe=input_dataframe[["prediction_id", "feature0", "feature1"]], + expected_parsed_dataframe=input_dataframe[ + ["prediction_id", "timestamp", "feature0", "feature1"] + ], expected_parsed_schema=replace( input_schema, prediction_label_column_name=None, - timestamp_column_name=None, excludes=None, ), should_log_warning_to_user=False, @@ -180,7 +198,8 @@ def test_excluded_single_column_schema_fields_set_to_none(self, caplog): def test_no_input_schema_features_and_no_excludes_discovers_features(self, caplog): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -190,8 +209,9 @@ def test_no_input_schema_features_and_no_excludes_discovers_features(self, caplo input_schema = Schema( prediction_id_column_name="prediction_id", prediction_label_column_name="prediction_label", + timestamp_column_name="timestamp", ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, @@ -207,7 +227,8 @@ def test_no_input_schema_features_and_list_of_excludes_discovers_non_excluded_fe ): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -219,15 +240,16 @@ def test_no_input_schema_features_and_list_of_excludes_discovers_non_excluded_fe excludes = ["prediction_label", "feature1", "tag0"] input_schema = Schema( prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", tag_column_names=["tag0", "tag1"], prediction_label_column_name="prediction_label", excludes=excludes, ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ - ["prediction_id", "feature0", "feature2", "tag1"] + ["prediction_id", "timestamp", "feature0", "feature2", "tag1"] ], expected_parsed_schema=replace( input_schema, @@ -243,7 +265,8 @@ def test_no_input_schema_features_and_list_of_excludes_discovers_non_excluded_fe def test_excluded_column_not_contained_in_dataframe_logs_warning(self, caplog): input_dataframe = DataFrame( { - "prediction_id": list(range(self.num_records)), + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "prediction_label": [f"label{index}" for index in range(self.num_records)], "feature0": np.zeros(self.num_records), "feature1": np.ones(self.num_records), @@ -255,16 +278,17 @@ def test_excluded_column_not_contained_in_dataframe_logs_warning(self, caplog): excludes = ["prediction_label", "column_not_in_dataframe"] input_schema = Schema( prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", feature_column_names=["feature0", "feature1", "feature2"], tag_column_names=["tag0", "tag1"], prediction_label_column_name="prediction_label", excludes=excludes, ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ - ["prediction_id", "feature0", "feature1", "feature2", "tag0", "tag1"] + ["prediction_id", "timestamp", "feature0", "feature1", "feature2", "tag0", "tag1"] ], expected_parsed_schema=replace( input_schema, prediction_label_column_name=None, excludes=None @@ -276,6 +300,8 @@ def test_excluded_column_not_contained_in_dataframe_logs_warning(self, caplog): def test_schema_includes_embedding_feature_has_all_embedding_columns_included(self, caplog): input_dataframe = DataFrame( { + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], @@ -284,15 +310,17 @@ def test_schema_includes_embedding_feature_has_all_embedding_columns_included(se } ) input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", link_to_data_column_name="link_to_data0", raw_data_column_name="raw_data_column0", ), - } + }, ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, @@ -304,6 +332,8 @@ def test_schema_includes_embedding_feature_has_all_embedding_columns_included(se def test_embedding_columns_of_excluded_embedding_feature_are_removed(self, caplog): input_dataframe = DataFrame( { + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], @@ -315,6 +345,8 @@ def test_embedding_columns_of_excluded_embedding_feature_are_removed(self, caplo } ) input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", @@ -329,11 +361,17 @@ def test_embedding_columns_of_excluded_embedding_feature_are_removed(self, caplo }, excludes=["embedding_feature0"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe[ - ["embedding_vector1", "link_to_data1", "raw_data_column1"] + [ + "prediction_id", + "timestamp", + "embedding_vector1", + "link_to_data1", + "raw_data_column1", + ] ], expected_parsed_schema=replace( input_schema, @@ -353,6 +391,8 @@ def test_embedding_columns_of_excluded_embedding_feature_are_removed(self, caplo def test_excluding_all_embedding_features_sets_schema_embedding_field_to_none(self, caplog): input_dataframe = DataFrame( { + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], @@ -361,6 +401,8 @@ def test_excluding_all_embedding_features_sets_schema_embedding_field_to_none(se } ) input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", @@ -370,10 +412,10 @@ def test_excluding_all_embedding_features_sets_schema_embedding_field_to_none(se }, excludes=["embedding_feature0"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, - expected_parsed_dataframe=input_dataframe[[]], + expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp"]], expected_parsed_schema=replace( input_schema, embedding_feature_column_names=None, @@ -388,6 +430,8 @@ def test_excluding_an_embedding_column_rather_than_the_embedding_feature_name_lo ): input_dataframe = DataFrame( { + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding_vector0": [ np.zeros(self.embedding_dimension) for _ in range(self.num_records) ], @@ -396,6 +440,8 @@ def test_excluding_an_embedding_column_rather_than_the_embedding_feature_name_lo } ) input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding_feature0": EmbeddingColumnNames( vector_column_name="embedding_vector0", @@ -405,7 +451,7 @@ def test_excluding_an_embedding_column_rather_than_the_embedding_feature_name_lo }, excludes=["embedding_vector0"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, expected_parsed_dataframe=input_dataframe, @@ -423,12 +469,16 @@ def test_excluding_embedding_feature_with_same_name_as_embedding_column_does_not ): input_dataframe = DataFrame( { + "prediction_id": [str(x) for x in range(self.num_records)], + "timestamp": [pd.Timestamp.now() for x in range(self.num_records)], "embedding0": [np.zeros(self.embedding_dimension) for _ in range(self.num_records)], "link_to_data0": [f"some-link{index}" for index in range(self.num_records)], "raw_data_column0": [f"some-text{index}" for index in range(self.num_records)], } ) input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", embedding_feature_column_names={ "embedding0": EmbeddingColumnNames( vector_column_name="embedding0", @@ -438,10 +488,10 @@ def test_excluding_embedding_feature_with_same_name_as_embedding_column_does_not }, excludes=["embedding0"], ) - self._run_function_and_check_output( + self._parse_dataframe_and_schema_and_check_output( input_dataframe=input_dataframe, input_schema=input_schema, - expected_parsed_dataframe=input_dataframe[[]], + expected_parsed_dataframe=input_dataframe[["prediction_id", "timestamp"]], expected_parsed_schema=replace( input_schema, embedding_feature_column_names=None, @@ -451,7 +501,7 @@ def test_excluding_embedding_feature_with_same_name_as_embedding_column_does_not caplog=caplog, ) - def _run_function_and_check_output( + def _parse_dataframe_and_schema_and_check_output( self, input_dataframe: DataFrame, input_schema: Schema, @@ -463,6 +513,7 @@ def _run_function_and_check_output( parsed_dataframe, parsed_schema = _parse_dataframe_and_schema( dataframe=input_dataframe, schema=input_schema ) + assert parsed_dataframe.equals(expected_parsed_dataframe) assert parsed_schema == expected_parsed_schema assert self._warning_logged(caplog) is should_log_warning_to_user @@ -477,6 +528,163 @@ def _warning_logged(caplog: LogCaptureFixture) -> bool: return True return False + def test_dataset_normalization_columns_already_normalized(self): + input_dataframe = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "timestamp": np.full( + shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + "prediction_id": random_uuids(self.num_records), + } + ) + + input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + output_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) + + assert output_dataframe.equals(input_dataframe) + + def test_dataset_normalization_timestamp_integer_to_datetime(self): + input_dataframe = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "timestamp": np.full( + shape=self.num_records, fill_value=pd.Timestamp.utcnow().timestamp(), dtype=int + ), + "prediction_id": random_uuids(self.num_records), + } + ) + input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + output_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) + + expected_dataframe = input_dataframe + expected_dataframe["timestamp"] = expected_dataframe["timestamp"].apply( + lambda x: to_datetime(x, unit="ms") + ) + assert output_dataframe.equals(expected_dataframe) + + def test_dataset_normalization_prediction_id_integer_to_string(self): + input_dataframe = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "timestamp": np.full( + shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + "prediction_id": range(self.num_records), + } + ) + input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + output_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) + + expected_dataframe = input_dataframe + expected_dataframe["prediction_id"] = expected_dataframe["prediction_id"].astype(str) + assert output_dataframe.equals(expected_dataframe) + + def test_dataset_normalization_columns_add_missing_prediction_id(self): + input_dataframe = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "timestamp": np.full( + shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + } + ) + + input_schema = Schema( + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + output_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) + + assert len(output_dataframe.columns) == 4 + assert output_dataframe[["prediction_label", "feature0", "timestamp"]].equals( + input_dataframe + ) + assert "prediction_id" in output_dataframe + assert output_dataframe.dtypes["prediction_id"], "string" + + def test_dataset_normalization_columns_add_missing_timestamp(self): + input_dataframe = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "prediction_id": random_uuids(self.num_records), + } + ) + + input_schema = Schema( + prediction_id_column_name="prediction_id", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + output_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) + + assert len(output_dataframe.columns) == 4 + assert output_dataframe[["prediction_label", "feature0", "prediction_id"]].equals( + input_dataframe + ) + assert "timestamp" in output_dataframe + assert output_dataframe.dtypes["timestamp"], "datetime[nz]" + + def test_dataset_normalization_columns_missing_prediction_id_and_timestamp(self): + input_dataframe = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + } + ) + + input_schema = Schema( + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + output_dataframe, _ = _parse_dataframe_and_schema( + dataframe=input_dataframe, schema=input_schema + ) + + assert len(output_dataframe.columns) == 4 + assert output_dataframe[["prediction_label", "feature0"]].equals(input_dataframe) + assert "prediction_id" in output_dataframe + assert output_dataframe.dtypes["prediction_id"], "string" + assert "timestamp" in output_dataframe + assert output_dataframe.dtypes["timestamp"], "datetime[nz]" + @property def num_records(self): return self._NUM_RECORDS @@ -484,3 +692,119 @@ def num_records(self): @property def embedding_dimension(self): return self._EMBEDDING_DIMENSION + + +class TestDataset: + _NUM_RECORDS = 9 + + def test_dataset_normalization_columns_already_normalized(self): + input_dataframe = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self._NUM_RECORDS), + "timestamp": np.full( + shape=self._NUM_RECORDS, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + "prediction_id": random_uuids(self.num_records), + } + ) + + input_schema = Schema( + prediction_id_column_name="prediction_id", + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + dataset = Dataset(dataframe=input_dataframe, schema=input_schema) + output_dataframe = dataset.dataframe + output_schema = dataset.schema + + assert output_dataframe.equals(input_dataframe) + assert output_schema == input_schema + + # TODO: Move validation tests to validation module; keep one validation integration test + def test_dataset_validate_invalid_prediction_id_datatype(self) -> None: + input_df = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "prediction_id": np.full( + shape=self.num_records, fill_value=pd.Timestamp.utcnow(), dtype=pd.Timestamp + ), + } + ) + + input_schema = Schema( + prediction_id_column_name="prediction_id", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) + + def test_dataset_validate_invalid_timestamp_datatype(self) -> None: + input_df = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "timestamp": random_uuids(self.num_records), + }, + ) + + input_schema = Schema( + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + ) + + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) + + def test_dataset_validate_invalid_schema_excludes_timestamp(self) -> None: + input_df = DataFrame( + { + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "timestamp": random_uuids(self.num_records), + }, + ) + + input_schema = Schema( + timestamp_column_name="timestamp", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + excludes=["timestamp"], + ) + + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) + + def test_dataset_validate_invalid_schema_excludes_prediction_id(self) -> None: + input_df = DataFrame( + { + "prediction_id": [str(x) for x in range(self.num_records)], + "prediction_label": [f"label{index}" for index in range(self.num_records)], + "feature0": np.zeros(self.num_records), + "timestamp": random_uuids(self.num_records), + }, + ) + + input_schema = Schema( + prediction_id_column_name="prediction_id", + feature_column_names=["feature0"], + prediction_label_column_name="prediction_label", + excludes=["prediction_id"], + ) + + with raises(DatasetError): + Dataset(dataframe=input_df, schema=input_schema) + + @property + def num_records(self): + return self._NUM_RECORDS + + +def random_uuids(num_records: int): + return [str(uuid.uuid4()) for _ in range(num_records)]