Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve SharedProcessPool tests performance #1950

Merged
4 changes: 2 additions & 2 deletions tests/morpheus/stages/test_multi_processing_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def test_created_stage_pipe(config: Config, dataset_pandas: DatasetManager):
expected_df = input_df.copy()
expected_df["new_column"] = "Hello"

df_count = 100
df_count = 10
df_generator = partial(pandas_dataframe_generator, dataset_pandas, df_count)

partial_fn = partial(_process_df, column="new_column", value="Hello")
Expand Down Expand Up @@ -225,7 +225,7 @@ def test_multiple_stages_pipe(config: Config, dataset_pandas: DatasetManager):
expected_df["new_column_1"] = "new_value"
expected_df["new_column_2"] = "Hello"

df_count = 100
df_count = 10
df_generator = partial(pandas_dataframe_generator, dataset_pandas, df_count)

partial_fn = partial(_process_df, column="new_column_1", value="new_value")
Expand Down
29 changes: 7 additions & 22 deletions tests/morpheus/utils/test_shared_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,43 +149,28 @@ def test_submit_single_task(shared_process_pool, a, b, expected):


@pytest.mark.slow
def test_submit_task_with_invalid_stage(shared_process_pool):
def test_submit_invalid_tasks(shared_process_pool):

pool = shared_process_pool

# submit_task() should raise ValueError if the stage does not exist
with pytest.raises(ValueError):
pool.submit_task("stage_does_not_exist", _add_task, 10, 20)


@pytest.mark.slow
def test_submit_task_raises_exception(shared_process_pool):

pool = shared_process_pool
pool.set_usage("test_stage", 0.5)

# if the function raises exception, the task can be submitted and the exception will be raised when calling result()
task = pool.submit_task("test_stage", _function_raises_exception)
with pytest.raises(RuntimeError):
task.result()


@pytest.mark.slow
def test_submit_task_with_unserializable_result(shared_process_pool):

pool = shared_process_pool
pool.set_usage("test_stage", 0.5)

# if the function returns unserializable result, the task can be submitted and the exception will be raised
# when calling result()
task = pool.submit_task("test_stage", _function_returns_unserializable_result)
with pytest.raises(TypeError):
task.result()


@pytest.mark.slow
def test_submit_task_with_unserializable_arg(shared_process_pool):

pool = shared_process_pool
pool.set_usage("test_stage", 0.5)

# Unserializable arguments cannot be submitted to the pool
# Function with unserializable arguments cannot be submitted to the pool
with pytest.raises(TypeError):
pool.submit_task("test_stage", _arbitrary_function, threading.Lock())

Expand All @@ -207,7 +192,7 @@ def test_submit_multiple_tasks(shared_process_pool, a, b, expected):
pool = shared_process_pool
pool.set_usage("test_stage", 0.5)

num_tasks = 100
num_tasks = 10
tasks = []
for _ in range(num_tasks):
tasks.append(pool.submit_task("test_stage", _add_task, a, b))
Expand Down
Loading