Skip to content

Commit

Permalink
Improve test coverage (#609)
Browse files Browse the repository at this point in the history
* Improve test coverage

* Fix ClusterDaskDistributor with recent Dask

Apply the same fix as with LocalDaskDistributor.
Fix #575.
  • Loading branch information
dbarbier authored and nils-braun committed Dec 3, 2019
1 parent 0654cec commit ea61188
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 5 deletions.
2 changes: 1 addition & 1 deletion tests/units/feature_extraction/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TestSettingsObject(TestCase):
def test_from_column_raises_on_wrong_column_format(self):

self.assertRaises(TypeError, from_columns, 42)
self.assertRaises(TypeError, from_columns, 42)
self.assertRaises(TypeError, from_columns, [42])
self.assertRaises(ValueError, from_columns, ["This is not a column name"])
self.assertRaises(ValueError, from_columns, ["This__neither"])
self.assertRaises(ValueError, from_columns, ["This__also__not"])
Expand Down
17 changes: 17 additions & 0 deletions tests/units/utilities/test_dataframe_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,18 @@ def test_with_wrong_input(self):
self.assertRaises(AttributeError, dataframe_functions._normalize_input_to_internal_representation, test_df,
"strange_id", "sort", "kind", "value")

test_df = pd.DataFrame([{"id": 0, "kind": "a", "value": 3, "sort": 1}])
self.assertRaises(AttributeError, dataframe_functions._normalize_input_to_internal_representation, test_df,
"id", "sort", "strange_kind", "value")

test_df = pd.DataFrame([{"id": np.NaN, "kind": "a", "value": 3, "sort": 1}])
self.assertRaises(ValueError, dataframe_functions._normalize_input_to_internal_representation, test_df,
"id", "sort", "kind", "value")

test_df = pd.DataFrame([{"id": 0, "kind": np.NaN, "value": 3, "sort": 1}])
self.assertRaises(ValueError, dataframe_functions._normalize_input_to_internal_representation, test_df,
"id", "sort", "kind", "value")

test_df = pd.DataFrame([{"id": 2}, {"id": 1}])
test_dict = {"a": test_df, "b": test_df}

Expand Down Expand Up @@ -202,6 +210,11 @@ def test_with_wrong_input(self):
column_sort="sort", column_kind="kind",
rolling_direction=1)

self.assertRaises(ValueError, dataframe_functions.roll_time_series,
df_or_dict=test_df, column_id=None,
column_sort="sort", column_kind="kind",
rolling_direction=1)

test_df = {"a": pd.DataFrame([{"id": 0}])}
self.assertRaises(ValueError, dataframe_functions.roll_time_series,
df_or_dict=test_df, column_id="id",
Expand Down Expand Up @@ -753,3 +766,7 @@ def test_get_id__correct_dict(self):
df_dict = {"a": pd.DataFrame({"_value": [1, 2, 3, 4, 10, 11], "id": [1, 1, 1, 1, 2, 2]}),
"b": pd.DataFrame({"_value": [5, 6, 7, 8, 12, 13], "id": [4, 4, 3, 3, 2, 2]})}
self.assertEqual(get_ids(df_dict, "id"), {1, 2, 3, 4})

def test_get_id_wrong(self):
other_type = np.array([1, 2, 3])
self.assertRaises(TypeError, get_ids, other_type, "id")
51 changes: 49 additions & 2 deletions tests/units/utilities/test_distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
from unittest import TestCase
import numpy as np
import pandas as pd
from distributed import LocalCluster, Client

from tsfresh import extract_features
from tsfresh.utilities.distribution import MultiprocessingDistributor, LocalDaskDistributor
from tsfresh.utilities.distribution import MultiprocessingDistributor, LocalDaskDistributor, ClusterDaskDistributor
from tests.fixtures import DataTestCase


class MultiprocessingDistributorTestCase(TestCase):

def test_partion(self):
def test_partition(self):

distributor = MultiprocessingDistributor(n_workers=1)

Expand Down Expand Up @@ -82,3 +83,49 @@ def test_local_dask_cluster_extraction_two_worker(self):
self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75])))
self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0])))

class ClusterDaskDistributorTestCase(DataTestCase):

def test_dask_cluster_extraction_one_worker(self):
cluster = LocalCluster(n_workers=1, threads_per_worker=1, diagnostics_port=False)
client = Client(cluster)
address = client.scheduler_info()['address']
Distributor = ClusterDaskDistributor(address=address)

df = self.create_test_data_sample()
extracted_features = extract_features(df, column_id="id", column_sort="sort", column_kind="kind",
column_value="val",
distributor=Distributor)

self.assertIsInstance(extracted_features, pd.DataFrame)
self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77])))
self.assertTrue(np.all(extracted_features.a__sum_values == np.array([691, 1017])))
self.assertTrue(np.all(extracted_features.a__abs_energy == np.array([32211, 63167])))
self.assertTrue(np.all(extracted_features.b__sum_values == np.array([757, 695])))
self.assertTrue(np.all(extracted_features.b__minimum == np.array([3, 1])))
self.assertTrue(np.all(extracted_features.b__abs_energy == np.array([36619, 35483])))
self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75])))
self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0])))
cluster.close()

def test_dask_cluster_extraction_two_workers(self):
cluster = LocalCluster(n_workers=2, threads_per_worker=1, diagnostics_port=False)
client = Client(cluster)
address = client.scheduler_info()['address']
Distributor = ClusterDaskDistributor(address=address)

df = self.create_test_data_sample()
extracted_features = extract_features(df, column_id="id", column_sort="sort", column_kind="kind",
column_value="val",
distributor=Distributor)

self.assertIsInstance(extracted_features, pd.DataFrame)
self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77])))
self.assertTrue(np.all(extracted_features.a__sum_values == np.array([691, 1017])))
self.assertTrue(np.all(extracted_features.a__abs_energy == np.array([32211, 63167])))
self.assertTrue(np.all(extracted_features.b__sum_values == np.array([757, 695])))
self.assertTrue(np.all(extracted_features.b__minimum == np.array([3, 1])))
self.assertTrue(np.all(extracted_features.b__abs_energy == np.array([36619, 35483])))
self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75])))
self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0])))
cluster.close()

6 changes: 4 additions & 2 deletions tsfresh/utilities/distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def distribute(self, func, partitioned_chunks, kwargs):
"""

if isinstance(partitioned_chunks, Iterable):
# since dask 2.0.0 client map no longer accepts iteratables
# since dask 2.0.0 client map no longer accepts iterables
partitioned_chunks = list(partitioned_chunks)
result = self.client.gather(self.client.map(partial(func, **kwargs), partitioned_chunks))
return [item for sublist in result for item in sublist]
Expand Down Expand Up @@ -319,7 +319,9 @@ def distribute(self, func, partitioned_chunks, kwargs):
:return: The result of the calculation as a list - each item should be the result of the application of func
to a single element.
"""

if isinstance(partitioned_chunks, Iterable):
# since dask 2.0.0 client map no longer accepts iterables
partitioned_chunks = list(partitioned_chunks)
result = self.client.gather(self.client.map(partial(func, **kwargs), partitioned_chunks))
return [item for sublist in result for item in sublist]

Expand Down

0 comments on commit ea61188

Please sign in to comment.