From ea61188bdbb901ec2db4857a2d1aa716ed3ce5c9 Mon Sep 17 00:00:00 2001 From: Denis Barbier Date: Tue, 3 Dec 2019 21:11:58 +0100 Subject: [PATCH] Improve test coverage (#609) * Improve test coverage * Fix ClusterDaskDistributor with recent Dask Apply the same fix as with LocalDaskDistributor. Fix #575. --- .../units/feature_extraction/test_settings.py | 2 +- .../utilities/test_dataframe_functions.py | 17 +++++++ tests/units/utilities/test_distribution.py | 51 ++++++++++++++++++- tsfresh/utilities/distribution.py | 6 ++- 4 files changed, 71 insertions(+), 5 deletions(-) diff --git a/tests/units/feature_extraction/test_settings.py b/tests/units/feature_extraction/test_settings.py index dd2a2d3ec..e63ee95ba 100644 --- a/tests/units/feature_extraction/test_settings.py +++ b/tests/units/feature_extraction/test_settings.py @@ -20,7 +20,7 @@ class TestSettingsObject(TestCase): def test_from_column_raises_on_wrong_column_format(self): self.assertRaises(TypeError, from_columns, 42) - self.assertRaises(TypeError, from_columns, 42) + self.assertRaises(TypeError, from_columns, [42]) self.assertRaises(ValueError, from_columns, ["This is not a column name"]) self.assertRaises(ValueError, from_columns, ["This__neither"]) self.assertRaises(ValueError, from_columns, ["This__also__not"]) diff --git a/tests/units/utilities/test_dataframe_functions.py b/tests/units/utilities/test_dataframe_functions.py index eee9143ba..dda9661a5 100644 --- a/tests/units/utilities/test_dataframe_functions.py +++ b/tests/units/utilities/test_dataframe_functions.py @@ -111,10 +111,18 @@ def test_with_wrong_input(self): self.assertRaises(AttributeError, dataframe_functions._normalize_input_to_internal_representation, test_df, "strange_id", "sort", "kind", "value") + test_df = pd.DataFrame([{"id": 0, "kind": "a", "value": 3, "sort": 1}]) + self.assertRaises(AttributeError, dataframe_functions._normalize_input_to_internal_representation, test_df, + "id", "sort", "strange_kind", "value") + test_df = pd.DataFrame([{"id": np.NaN, "kind": "a", "value": 3, "sort": 1}]) self.assertRaises(ValueError, dataframe_functions._normalize_input_to_internal_representation, test_df, "id", "sort", "kind", "value") + test_df = pd.DataFrame([{"id": 0, "kind": np.NaN, "value": 3, "sort": 1}]) + self.assertRaises(ValueError, dataframe_functions._normalize_input_to_internal_representation, test_df, + "id", "sort", "kind", "value") + test_df = pd.DataFrame([{"id": 2}, {"id": 1}]) test_dict = {"a": test_df, "b": test_df} @@ -202,6 +210,11 @@ def test_with_wrong_input(self): column_sort="sort", column_kind="kind", rolling_direction=1) + self.assertRaises(ValueError, dataframe_functions.roll_time_series, + df_or_dict=test_df, column_id=None, + column_sort="sort", column_kind="kind", + rolling_direction=1) + test_df = {"a": pd.DataFrame([{"id": 0}])} self.assertRaises(ValueError, dataframe_functions.roll_time_series, df_or_dict=test_df, column_id="id", @@ -753,3 +766,7 @@ def test_get_id__correct_dict(self): df_dict = {"a": pd.DataFrame({"_value": [1, 2, 3, 4, 10, 11], "id": [1, 1, 1, 1, 2, 2]}), "b": pd.DataFrame({"_value": [5, 6, 7, 8, 12, 13], "id": [4, 4, 3, 3, 2, 2]})} self.assertEqual(get_ids(df_dict, "id"), {1, 2, 3, 4}) + + def test_get_id_wrong(self): + other_type = np.array([1, 2, 3]) + self.assertRaises(TypeError, get_ids, other_type, "id") diff --git a/tests/units/utilities/test_distribution.py b/tests/units/utilities/test_distribution.py index 6089092eb..0da8a2a50 100644 --- a/tests/units/utilities/test_distribution.py +++ b/tests/units/utilities/test_distribution.py @@ -5,15 +5,16 @@ from unittest import TestCase import numpy as np import pandas as pd +from distributed import LocalCluster, Client from tsfresh import extract_features -from tsfresh.utilities.distribution import MultiprocessingDistributor, LocalDaskDistributor +from tsfresh.utilities.distribution import MultiprocessingDistributor, LocalDaskDistributor, ClusterDaskDistributor from tests.fixtures import DataTestCase class MultiprocessingDistributorTestCase(TestCase): - def test_partion(self): + def test_partition(self): distributor = MultiprocessingDistributor(n_workers=1) @@ -82,3 +83,49 @@ def test_local_dask_cluster_extraction_two_worker(self): self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75]))) self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0]))) +class ClusterDaskDistributorTestCase(DataTestCase): + + def test_dask_cluster_extraction_one_worker(self): + cluster = LocalCluster(n_workers=1, threads_per_worker=1, diagnostics_port=False) + client = Client(cluster) + address = client.scheduler_info()['address'] + Distributor = ClusterDaskDistributor(address=address) + + df = self.create_test_data_sample() + extracted_features = extract_features(df, column_id="id", column_sort="sort", column_kind="kind", + column_value="val", + distributor=Distributor) + + self.assertIsInstance(extracted_features, pd.DataFrame) + self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77]))) + self.assertTrue(np.all(extracted_features.a__sum_values == np.array([691, 1017]))) + self.assertTrue(np.all(extracted_features.a__abs_energy == np.array([32211, 63167]))) + self.assertTrue(np.all(extracted_features.b__sum_values == np.array([757, 695]))) + self.assertTrue(np.all(extracted_features.b__minimum == np.array([3, 1]))) + self.assertTrue(np.all(extracted_features.b__abs_energy == np.array([36619, 35483]))) + self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75]))) + self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0]))) + cluster.close() + + def test_dask_cluster_extraction_two_workers(self): + cluster = LocalCluster(n_workers=2, threads_per_worker=1, diagnostics_port=False) + client = Client(cluster) + address = client.scheduler_info()['address'] + Distributor = ClusterDaskDistributor(address=address) + + df = self.create_test_data_sample() + extracted_features = extract_features(df, column_id="id", column_sort="sort", column_kind="kind", + column_value="val", + distributor=Distributor) + + self.assertIsInstance(extracted_features, pd.DataFrame) + self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77]))) + self.assertTrue(np.all(extracted_features.a__sum_values == np.array([691, 1017]))) + self.assertTrue(np.all(extracted_features.a__abs_energy == np.array([32211, 63167]))) + self.assertTrue(np.all(extracted_features.b__sum_values == np.array([757, 695]))) + self.assertTrue(np.all(extracted_features.b__minimum == np.array([3, 1]))) + self.assertTrue(np.all(extracted_features.b__abs_energy == np.array([36619, 35483]))) + self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75]))) + self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0]))) + cluster.close() + diff --git a/tsfresh/utilities/distribution.py b/tsfresh/utilities/distribution.py index 857129725..8a43f5dfb 100644 --- a/tsfresh/utilities/distribution.py +++ b/tsfresh/utilities/distribution.py @@ -261,7 +261,7 @@ def distribute(self, func, partitioned_chunks, kwargs): """ if isinstance(partitioned_chunks, Iterable): - # since dask 2.0.0 client map no longer accepts iteratables + # since dask 2.0.0 client map no longer accepts iterables partitioned_chunks = list(partitioned_chunks) result = self.client.gather(self.client.map(partial(func, **kwargs), partitioned_chunks)) return [item for sublist in result for item in sublist] @@ -319,7 +319,9 @@ def distribute(self, func, partitioned_chunks, kwargs): :return: The result of the calculation as a list - each item should be the result of the application of func to a single element. """ - + if isinstance(partitioned_chunks, Iterable): + # since dask 2.0.0 client map no longer accepts iterables + partitioned_chunks = list(partitioned_chunks) result = self.client.gather(self.client.map(partial(func, **kwargs), partitioned_chunks)) return [item for sublist in result for item in sublist]