Skip to content

Commit

Permalink
Change to proper test fixture.
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed Jan 31, 2023
1 parent dceb521 commit 39fd63f
Showing 1 changed file with 33 additions and 43 deletions.
76 changes: 33 additions & 43 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ray.tests.conftest import * # noqa


def test_read_operator(ray_start_cluster_enabled, enable_optimizer):
def test_read_operator(ray_start_regular_shared, enable_optimizer):
planner = Planner()
op = Read(ParquetDatasource())
plan = LogicalPlan(op)
Expand All @@ -36,7 +36,7 @@ def test_read_operator(ray_start_cluster_enabled, enable_optimizer):
assert isinstance(physical_op.input_dependencies[0], InputDataBuffer)


def test_map_batches_operator(ray_start_cluster_enabled, enable_optimizer):
def test_map_batches_operator(ray_start_regular_shared, enable_optimizer):
planner = Planner()
read_op = Read(ParquetDatasource())
op = MapBatches(
Expand All @@ -52,13 +52,13 @@ def test_map_batches_operator(ray_start_cluster_enabled, enable_optimizer):
assert isinstance(physical_op.input_dependencies[0], MapOperator)


def test_map_batches_e2e(ray_start_cluster_enabled, enable_optimizer):
def test_map_batches_e2e(ray_start_regular_shared, enable_optimizer):
ds = ray.data.range(5)
ds = ds.map_batches(lambda x: x)
assert ds.take_all() == list(range(5)), ds


def test_map_rows_operator(ray_start_cluster_enabled, enable_optimizer):
def test_map_rows_operator(ray_start_regular_shared, enable_optimizer):
planner = Planner()
read_op = Read(ParquetDatasource())
op = MapRows(
Expand All @@ -74,13 +74,13 @@ def test_map_rows_operator(ray_start_cluster_enabled, enable_optimizer):
assert isinstance(physical_op.input_dependencies[0], MapOperator)


def test_map_rows_e2e(ray_start_cluster_enabled, enable_optimizer):
def test_map_rows_e2e(ray_start_regular_shared, enable_optimizer):
ds = ray.data.range(5)
ds = ds.map(lambda x: x + 1)
assert ds.take_all() == [1, 2, 3, 4, 5], ds


def test_filter_operator(ray_start_cluster_enabled, enable_optimizer):
def test_filter_operator(ray_start_regular_shared, enable_optimizer):
planner = Planner()
read_op = Read(ParquetDatasource())
op = Filter(
Expand All @@ -96,13 +96,13 @@ def test_filter_operator(ray_start_cluster_enabled, enable_optimizer):
assert isinstance(physical_op.input_dependencies[0], MapOperator)


def test_filter_e2e(ray_start_cluster_enabled, enable_optimizer):
def test_filter_e2e(ray_start_regular_shared, enable_optimizer):
ds = ray.data.range(5)
ds = ds.filter(fn=lambda x: x % 2 == 0)
assert ds.take_all() == [0, 2, 4], ds


def test_flat_map(ray_start_cluster_enabled, enable_optimizer):
def test_flat_map(ray_start_regular_shared, enable_optimizer):
planner = Planner()
read_op = Read(ParquetDatasource())
op = FlatMap(
Expand All @@ -118,13 +118,13 @@ def test_flat_map(ray_start_cluster_enabled, enable_optimizer):
assert isinstance(physical_op.input_dependencies[0], MapOperator)


def test_flat_map_e2e(ray_start_cluster_enabled, enable_optimizer):
def test_flat_map_e2e(ray_start_regular_shared, enable_optimizer):
ds = ray.data.range(2)
ds = ds.flat_map(fn=lambda x: [x, x])
assert ds.take_all() == [0, 0, 1, 1], ds


def test_column_ops_e2e(ray_start_cluster_enabled, enable_optimizer):
def test_column_ops_e2e(ray_start_regular_shared, enable_optimizer):
ds = ray.data.range(2)
ds = ds.add_column(fn=lambda df: df.iloc[:, 0], col="new_col")
assert ds.take_all() == [{"value": 0, "new_col": 0}, {"value": 1, "new_col": 1}], ds
Expand All @@ -136,7 +136,7 @@ def test_column_ops_e2e(ray_start_cluster_enabled, enable_optimizer):
assert ds.take_all() == [{"value": 0}, {"value": 1}], ds


def test_random_sample_e2e(ray_start_cluster_enabled, enable_optimizer):
def test_random_sample_e2e(ray_start_regular_shared, enable_optimizer):
import math

def ensure_sample_size_close(dataset, sample_percent=0.5):
Expand All @@ -155,7 +155,7 @@ def ensure_sample_size_close(dataset, sample_percent=0.5):
ensure_sample_size_close(ds)


def test_randomize_blocks_operator(ray_start_cluster_enabled, enable_optimizer):
def test_randomize_blocks_operator(ray_start_regular_shared, enable_optimizer):
planner = Planner()
read_op = Read(ParquetDatasource())
op = RandomizeBlocks(
Expand All @@ -171,13 +171,13 @@ def test_randomize_blocks_operator(ray_start_cluster_enabled, enable_optimizer):
assert isinstance(physical_op.input_dependencies[0], MapOperator)


def test_randomize_blocks_e2e(ray_start_cluster_enabled, enable_optimizer):
def test_randomize_blocks_e2e(ray_start_regular_shared, enable_optimizer):
ds = ray.data.range(12, parallelism=4)
ds = ds.randomize_block_order(seed=0)
assert ds.take_all() == [6, 7, 8, 0, 1, 2, 3, 4, 5, 9, 10, 11], ds


def test_random_shuffle_operator(ray_start_cluster_enabled, enable_optimizer):
def test_random_shuffle_operator(ray_start_regular_shared, enable_optimizer):
planner = Planner()
read_op = Read(ParquetDatasource())
op = RandomShuffle(
Expand All @@ -194,7 +194,7 @@ def test_random_shuffle_operator(ray_start_cluster_enabled, enable_optimizer):


def test_random_shuffle_e2e(
ray_start_cluster_enabled, enable_optimizer, use_push_based_shuffle
ray_start_regular_shared, enable_optimizer, use_push_based_shuffle
):
ds = ray.data.range(12, parallelism=4)
r1 = ds.random_shuffle(seed=0).take_all()
Expand All @@ -204,14 +204,10 @@ def test_random_shuffle_e2e(
assert sorted(r2) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], r2


def test_repartition_operator(ray_start_cluster_enabled, enable_optimizer):
def test_repartition_operator(ray_start_regular_shared, enable_optimizer):
planner = Planner()
read_op = Read(ParquetDatasource())
op = Repartition(
read_op,
num_outputs=5,
shuffle=True,
)
op = Repartition(read_op, num_outputs=5, shuffle=True)
plan = LogicalPlan(op)
physical_op = planner.plan(plan).dag

Expand All @@ -221,18 +217,14 @@ def test_repartition_operator(ray_start_cluster_enabled, enable_optimizer):
assert isinstance(physical_op.input_dependencies[0], MapOperator)

# Check error is thrown for non-shuffle repartition.
op = Repartition(read_op, num_outputs=5, shuffle=False)
plan = LogicalPlan(op)
with pytest.raises(AssertionError):
planner.plan(
Repartition(
read_op,
num_outputs=5,
shuffle=False,
)
)
planner.plan(plan)


def test_repartition_e2e(
ray_start_cluster_enabled, enable_optimizer, use_push_based_shuffle
ray_start_regular_shared, enable_optimizer, use_push_based_shuffle
):
ds = ray.data.range(10000, parallelism=10)
ds1 = ds.repartition(20, shuffle=True)
Expand All @@ -243,7 +235,7 @@ def test_repartition_e2e(
ds.repartition(20, shuffle=False).take_all()


def test_read_map_batches_operator_fusion(ray_start_cluster_enabled, enable_optimizer):
def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optimizer):
# Test that Read is fused with MapBatches.
planner = Planner()
read_op = Read(ParquetDatasource())
Expand All @@ -263,7 +255,7 @@ def test_read_map_batches_operator_fusion(ray_start_cluster_enabled, enable_opti
assert isinstance(physical_op.input_dependencies[0], InputDataBuffer)


def test_read_map_chain_operator_fusion(ray_start_cluster_enabled, enable_optimizer):
def test_read_map_chain_operator_fusion(ray_start_regular_shared, enable_optimizer):
# Test that a chain of different map operators are fused.
planner = Planner()
read_op = Read(ParquetDatasource())
Expand All @@ -284,7 +276,7 @@ def test_read_map_chain_operator_fusion(ray_start_cluster_enabled, enable_optimi


def test_read_map_batches_operator_fusion_compatible_remote_args(
ray_start_cluster_enabled, enable_optimizer
ray_start_regular_shared, enable_optimizer
):
# Test that map operators are stilled fused when remote args are compatible.
planner = Planner()
Expand All @@ -307,7 +299,7 @@ def test_read_map_batches_operator_fusion_compatible_remote_args(


def test_read_map_batches_operator_fusion_incompatible_remote_args(
ray_start_cluster_enabled, enable_optimizer
ray_start_regular_shared, enable_optimizer
):
# Test that map operators are not fused when remote args are incompatible.
planner = Planner()
Expand All @@ -331,7 +323,7 @@ def test_read_map_batches_operator_fusion_incompatible_remote_args(


def test_read_map_batches_operator_fusion_compute_tasks_to_actors(
ray_start_cluster_enabled, enable_optimizer
ray_start_regular_shared, enable_optimizer
):
# Test that a task-based map operator is fused into an actor-based map operator when
# the former comes before the latter.
Expand All @@ -352,7 +344,7 @@ def test_read_map_batches_operator_fusion_compute_tasks_to_actors(


def test_read_map_batches_operator_fusion_compute_read_to_actors(
ray_start_cluster_enabled, enable_optimizer
ray_start_regular_shared, enable_optimizer
):
# Test that reads fuse into an actor-based map operator.
planner = Planner()
Expand All @@ -371,7 +363,7 @@ def test_read_map_batches_operator_fusion_compute_read_to_actors(


def test_read_map_batches_operator_fusion_incompatible_compute(
ray_start_cluster_enabled, enable_optimizer
ray_start_regular_shared, enable_optimizer
):
# Test that map operators are not fused when compute strategies are incompatible.
planner = Planner()
Expand All @@ -394,7 +386,7 @@ def test_read_map_batches_operator_fusion_incompatible_compute(


def test_read_map_batches_operator_fusion_target_block_size(
ray_start_cluster_enabled, enable_optimizer
ray_start_regular_shared, enable_optimizer
):
# Test that fusion of map operators merges their block sizes in the expected way
# (taking the max).
Expand All @@ -419,7 +411,7 @@ def test_read_map_batches_operator_fusion_target_block_size(


def test_read_map_batches_operator_fusion_callable_classes(
ray_start_cluster_enabled, enable_optimizer
ray_start_regular_shared, enable_optimizer
):
# Test that callable classes can still be fused if they're the same function.
planner = Planner()
Expand All @@ -444,7 +436,7 @@ def __call__(self, x):


def test_read_map_batches_operator_fusion_incompatible_callable_classes(
ray_start_cluster_enabled, enable_optimizer
ray_start_regular_shared, enable_optimizer
):
# Test that map operators are not fused when different callable classes are used.
planner = Planner()
Expand Down Expand Up @@ -476,7 +468,7 @@ def __call__(self, x):


def test_read_map_batches_operator_fusion_incompatible_constructor_args(
ray_start_cluster_enabled, enable_optimizer
ray_start_regular_shared, enable_optimizer
):
# Test that map operators are not fused when callable classes have different
# constructor args.
Expand Down Expand Up @@ -513,9 +505,7 @@ def __call__(self, x):
assert isinstance(physical_op.input_dependencies[0], InputDataBuffer)


def test_read_map_chain_operator_fusion_e2e(
ray_start_cluster_enabled, enable_optimizer
):
def test_read_map_chain_operator_fusion_e2e(ray_start_regular_shared, enable_optimizer):
ds = ray.data.range(10, parallelism=2)
ds = ds.filter(lambda x: x % 2 == 0)
ds = ds.map(lambda x: x + 1)
Expand Down

0 comments on commit 39fd63f

Please sign in to comment.