Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dataset fixture to ease fetching DataFrames for tests #847

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a8f5b84
WIP: First pass, untested
dagardner-nv Apr 4, 2023
b5f4036
wip
dagardner-nv Apr 4, 2023
673d45e
wip
dagardner-nv Apr 5, 2023
ffc3035
wip
dagardner-nv Apr 5, 2023
810a0cf
wip
dagardner-nv Apr 5, 2023
4e323a6
wip
dagardner-nv Apr 5, 2023
97c6edb
wip
dagardner-nv Apr 5, 2023
7e299ea
Remove unused imports and remove unnecessary writes to disk
dagardner-nv Apr 5, 2023
3a5d97c
wip
dagardner-nv Apr 5, 2023
f1a7fef
wip
dagardner-nv Apr 5, 2023
c7450a2
Fix formatting of docstring
dagardner-nv Apr 5, 2023
d9348cd
Remove unused import
dagardner-nv Apr 5, 2023
b2c6685
Cache instances of DatasetLoader, add type hints to class vars
dagardner-nv Apr 5, 2023
134e8fc
Use the get_loader method
dagardner-nv Apr 5, 2023
3927eff
Fix comment
dagardner-nv Apr 5, 2023
e69e413
Replace usage of use_pandas as a way to get test data which had the u…
dagardner-nv Apr 5, 2023
b1f9e43
Merge branch 'branch-23.07' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Apr 13, 2023
f74bd4b
Merge branch 'branch-23.07' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Apr 13, 2023
0b704f0
Merge branch 'branch-23.07' into david-datasets-fixture-785
dagardner-nv Apr 13, 2023
9386c8d
Rename DatasetLoader to DatasetManager [no ci]
dagardner-nv Apr 13, 2023
92e1a6c
wip [no ci]
dagardner-nv Apr 14, 2023
0c10a7c
wip
dagardner-nv Apr 14, 2023
e06cf15
Mark dfencoder end to end tests as slow
dagardner-nv Apr 14, 2023
da2fb5c
Move test utils into tests/utils [no ci]
dagardner-nv Apr 14, 2023
f135844
Replace get_loader with just depending on the class being a singleton
dagardner-nv Apr 14, 2023
fdcd008
Remove _dataset_mod
dagardner-nv Apr 14, 2023
1c6e30d
Use absolute paths as cache keys to avoid accidental cache misses
dagardner-nv Apr 14, 2023
1586f05
Rename tests to avoid ambiguity
dagardner-nv Apr 14, 2023
da47969
Remove use_cpp fixture from dataset fixture request
dagardner-nv Apr 14, 2023
ac30fcd
wip
dagardner-nv Apr 14, 2023
7a70c2d
Move init logic to new
dagardner-nv Apr 14, 2023
a43e2c3
Leave a comment explainint __new__ instead of __init__
dagardner-nv Apr 14, 2023
193286e
Add comments explaining fixture behavior
dagardner-nv Apr 14, 2023
638e91a
more tests
dagardner-nv Apr 14, 2023
c555fca
Move classes out of tests/utils/__init__.py to their own modules
dagardner-nv Apr 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,21 +443,44 @@ def chdir_tmpdir(request: pytest.FixtureRequest, tmp_path):


@pytest.fixture(scope="session")
def _filter_probs_df():
from morpheus.io.deserializers import read_file_to_df
from utils import TEST_DIRS
input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv")
yield read_file_to_df(input_file, df_type='cudf')
def _dataset():
"""
Session scoped cudf DatasetLoader
"""
import dataset_loader
yield dataset_loader.DatasetLoader.get_loader(df_type='cudf')
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved


@pytest.fixture(scope="function")
def filter_probs_df(_filter_probs_df, df_type: typing.Literal['cudf', 'pandas'], use_cpp: bool):
if df_type == 'cudf':
yield _filter_probs_df.copy(deep=True)
elif df_type == 'pandas':
yield _filter_probs_df.to_pandas()
else:
assert False, "Unknown df_type type"
def dataset(_dataset, df_type: typing.Literal['cudf', 'pandas'], use_cpp: bool):
"""
Yields a DatasetLoader instance with `df_type` as the default DataFrame type.
"""
yield _dataset.get_loader(df_type=df_type)


@pytest.fixture(scope="function")
def dataset_pandas(_dataset):
"""
Yields a DatasetLoader instance with pandas as the default DataFrame type.
"""
yield _dataset.get_loader(df_type='pandas')


@pytest.fixture(scope="function")
def dataset_cudf(_dataset):
"""
Yields a DatasetLoader instance with cudf as the default DataFrame type.
"""
yield _dataset.get_loader(df_type='cudf')
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved


@pytest.fixture(scope="function")
def filter_probs_df(dataset):
"""
Shortcut fixture for loading the filter_probs.csv dataset
"""
yield dataset["filter_probs.csv"]


def wait_for_camouflage(host="localhost", port=8000, timeout=5):
Expand Down
177 changes: 177 additions & 0 deletions tests/dataset_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import random
import typing

import cupy as cp
import pandas as pd

import cudf as cdf # rename to avoid clash with property method

from morpheus.io.deserializers import read_file_to_df
from utils import TEST_DIRS


class DatasetLoader:
"""
Helper class for loading and caching test datasets as DataFrames, along with some common manipulation methods.

Parameters
----------
df_type : typing.Literal['cudf', 'pandas']
Type of DataFrame to return unless otherwise explicitly specified.
"""

__df_cache: typing.Dict[typing.Tuple[typing.Literal['cudf', 'pandas'], str],
typing.Union[cdf.DataFrame, pd.DataFrame]] = {}

# Values in `__instances` are instances of `DatasetLoader`
__instances: typing.Dict[typing.Literal['cudf', 'pandas'], typing.Any] = {}

def __init__(self, df_type: typing.Literal['cudf', 'pandas']) -> None:
self._default_df_type = df_type
self.__instances[df_type] = self
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved

def get_alt_df_type(self, df_type: typing.Literal['cudf', 'pandas']) -> typing.Literal['cudf', 'pandas']:
"""Returns the other possible df type."""
return 'cudf' if df_type == 'pandas' else 'pandas'
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved

def clear(self):
self.__df_cache.clear()

def get_df(self,
file_path: str,
df_type: typing.Literal['cudf', 'pandas'] = None) -> typing.Union[cdf.DataFrame, pd.DataFrame]:
"""
Fetch a DataFrame specified from `file_path`. If `file_path` is not an absolute path, it is assumed to be
relative to the `test/tests_data` dir. If a DataFrame matching both `file_path` and `df_type` has already been
fetched, then a cached copy will be returned. In the event that a DataFrame matching `file_path` but not
`df_type` exists in the cache, then the cached copy will be cast to the appropriate type, stored in the cache
and then returned
"""
if os.path.abspath(file_path) != os.path.normpath(file_path):
full_path = os.path.join(TEST_DIRS.tests_data_dir, file_path)
else:
full_path = file_path

if df_type is None:
df_type = self._default_df_type

dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
df = self.__df_cache.get((df_type, full_path))
if df is None:
# If it isn't in the cache, but we have a cached copy in another DF format use it instead of going to disk
alt_df_type = self.get_alt_df_type(df_type=df_type)
alt_df = self.__df_cache.get((alt_df_type, full_path))
if alt_df is not None:
if alt_df_type == 'cudf':
df = alt_df.to_pandas()
else:
df = cdf.DataFrame.from_pandas(alt_df)
else:
df = read_file_to_df(full_path, df_type=df_type)

self.__df_cache[(df_type, full_path)] = df

return df.copy(deep=True)

def __getitem__(
self, item: typing.Union[str, typing.Tuple[str], typing.Tuple[str, typing.Literal['cudf', 'pandas']]]
) -> typing.Union[cdf.DataFrame, pd.DataFrame]:
if not isinstance(item, tuple):
item = (item, )

return self.get_df(*item)

@classmethod
def get_loader(cls, df_type: typing.Literal['cudf', 'pandas']):
"""
Factory method to return an instance of `DatasetLoader` for the given df_type, returns `self` if the df_type
matches. Used by cudf and pandas propery methods.
"""
try:
loader = cls.__instances[df_type]
except KeyError:
loader = cls(df_type=df_type)

return loader

@property
def cudf(self):
return self.get_loader(df_type='cudf')

@property
def pandas(self):
return self.get_loader(df_type='pandas')
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def repeat(df: typing.Union[cdf.DataFrame, pd.DataFrame],
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
repeat_count: int = 2,
reset_index: bool = True) -> typing.Union[cdf.DataFrame, pd.DataFrame]:
"""
Returns a DF consisting of `repeat_count` copies of the original
"""
if isinstance(df, pd.DataFrame):
concat_fn = pd.concat
else:
concat_fn = cdf.concat

repeated_df = concat_fn([df for _ in range(repeat_count)])

if reset_index:
repeated_df = repeated_df.reset_index(inplace=False, drop=True)

return repeated_df

@staticmethod
def replace_index(df: typing.Union[cdf.DataFrame, pd.DataFrame],
replace_ids: typing.Dict[int, int]) -> typing.Union[cdf.DataFrame, pd.DataFrame]:
"""Return a new DataFrame's where we replace some index values with others."""
return df.rename(index=replace_ids)

@classmethod
def dup_index(cls,
df: typing.Union[cdf.DataFrame, pd.DataFrame],
count: int = 1) -> typing.Union[cdf.DataFrame, pd.DataFrame]:
"""Randomly duplicate `count` entries in a DataFrame's index"""
assert count * 2 <= len(df), "Count must be less than half the number of rows."

# Sample 2x the count. One for the old ID and one for the new ID. Dont want duplicates so we use random.sample
# (otherwise you could get less duplicates than requested if two IDs just swap)
dup_ids = random.sample(df.index.values.tolist(), 2 * count)

# Create a dictionary of old ID to new ID
replace_dict = {x: y for x, y in zip(dup_ids[:count], dup_ids[count:])}

# Return a new dataframe where we replace some index values with others
return cls.replace_index(df, replace_dict)

@staticmethod
def assert_df_equal(df_to_check: typing.Union[pd.DataFrame, cdf.DataFrame], val_to_check: typing.Any) -> bool:
"""Compare a DataFrame against a validation dataset which can either be a DataFrame, Series or CuPy array."""

# Comparisons work better in cudf so convert everything to that
if (isinstance(df_to_check, cdf.DataFrame) or isinstance(df_to_check, cdf.Series)):
df_to_check = df_to_check.to_pandas()

if (isinstance(val_to_check, cdf.DataFrame) or isinstance(val_to_check, cdf.Series)):
val_to_check = val_to_check.to_pandas()
elif (isinstance(val_to_check, cp.ndarray)):
val_to_check = val_to_check.get()

bool_df = df_to_check == val_to_check

return bool(bool_df.all(axis=None))
14 changes: 4 additions & 10 deletions tests/dfencoder/test_autoencoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import pytest
import torch

from dataset_loader import DatasetLoader
from morpheus.config import AEFeatureScalar
from morpheus.io.deserializers import read_file_to_df
from morpheus.models.dfencoder import ae_module
from morpheus.models.dfencoder import autoencoder
from morpheus.models.dfencoder import scalers
from morpheus.models.dfencoder.dataframe import EncoderDataFrame
from utils import TEST_DIRS

# Only pandas and C++ is supported
# Only pandas and Python is supported
pytestmark = [pytest.mark.use_pandas, pytest.mark.use_python]

BIN_COLS = ['ts_anomaly']
Expand Down Expand Up @@ -73,15 +73,9 @@ def train_ae():
progress_bar=False)


@pytest.fixture(scope="module")
def _train_df() -> pd.DataFrame:
input_file = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-role-g-validation-data-input.csv")
yield read_file_to_df(input_file, df_type='pandas')


@pytest.fixture(scope="function")
def train_df(_train_df) -> typing.Generator[pd.DataFrame, None, None]:
yield _train_df.copy(deep=True)
def train_df(dataset_pandas: DatasetLoader) -> typing.Iterator[pd.DataFrame]:
yield dataset_pandas[os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-role-g-validation-data-input.csv")]


def compare_numeric_features(features, expected_features):
Expand Down
13 changes: 7 additions & 6 deletions tests/test_abp_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import pandas
import pytest

from morpheus.common import FileTypes
from dataset_loader import DatasetLoader
from morpheus.config import Config
from morpheus.config import ConfigFIL
from morpheus.config import PipelineModes
from morpheus.io.deserializers import read_file_to_df
from morpheus.io.utils import filter_null_data
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
Expand All @@ -54,7 +53,8 @@
@pytest.mark.slow
@pytest.mark.use_python
@mock.patch('tritonclient.grpc.InferenceServerClient')
def test_abp_no_cpp(mock_triton_client,
def test_abp_no_cpp(mock_triton_client: mock.MagicMock,
dataset_pandas: DatasetLoader,
config: Config,
kafka_bootstrap_servers: str,
kafka_topics: typing.Tuple[str, str],
Expand Down Expand Up @@ -129,7 +129,7 @@ def async_infer(callback=None, **k):

pipe.run()

val_df = read_file_to_df(val_file_name, file_type=FileTypes.Auto, df_type='pandas')
val_df = dataset_pandas[val_file_name]

output_buf = StringIO()
for rec in kafka_consumer:
Expand All @@ -150,7 +150,8 @@ def async_infer(callback=None, **k):
@pytest.mark.slow
@pytest.mark.use_cpp
@pytest.mark.usefixtures("launch_mock_triton")
def test_abp_cpp(config,
def test_abp_cpp(config: Config,
dataset_pandas: DatasetLoader,
kafka_bootstrap_servers: str,
kafka_topics: typing.Tuple[str, str],
kafka_consumer: "KafkaConsumer"):
Expand Down Expand Up @@ -195,7 +196,7 @@ def test_abp_cpp(config,

pipe.run()

val_df = read_file_to_df(val_file_name, file_type=FileTypes.Auto, df_type='pandas')
val_df = dataset_pandas[val_file_name]
output_buf = StringIO()
for rec in kafka_consumer:
output_buf.write("{}\n".format(rec.value.decode("utf-8")))
Expand Down
17 changes: 9 additions & 8 deletions tests/test_add_classifications_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

import cudf

from dataset_loader import DatasetLoader
from morpheus.config import Config
from morpheus.messages.memory.tensor_memory import TensorMemory
from morpheus.messages.message_meta import MessageMeta
from morpheus.messages.multi_response_message import MultiResponseMessage
from morpheus.stages.postprocess.add_classifications_stage import AddClassificationsStage
from utils import assert_df_equal


def test_constructor(config):
def test_constructor(config: Config):
config.class_labels = ['frogs', 'lizards', 'toads']

ac = AddClassificationsStage(config)
Expand Down Expand Up @@ -64,9 +65,9 @@ def test_add_labels():

labeled = AddClassificationsStage._add_labels(message, idx2label=class_labels, threshold=threshold)

assert assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0])
assert assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1])
assert assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2])
assert DatasetLoader.assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0])
assert DatasetLoader.assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1])
assert DatasetLoader.assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2])

# Same thing but change the probs tensor name
message = MultiResponseMessage(meta=MessageMeta(df),
Expand All @@ -75,9 +76,9 @@ def test_add_labels():

labeled = AddClassificationsStage._add_labels(message, idx2label=class_labels, threshold=threshold)

assert assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0])
assert assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1])
assert assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2])
assert DatasetLoader.assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0])
assert DatasetLoader.assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1])
assert DatasetLoader.assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2])

# Fail in missing probs data
message = MultiResponseMessage(meta=MessageMeta(df),
Expand Down
Loading