From 60b066bbff0131b02e223fd70fb79ac2f09444fc Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 7 Feb 2023 16:58:54 -0800 Subject: [PATCH 1/5] flip on by default Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/interfaces.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 2a350e9efd18..33eac9bae29c 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -152,7 +152,7 @@ class ExecutionOptions: # Always preserve ordering of blocks, even if using operators that # don't require it. - preserve_order: bool = True + preserve_order: bool = False @dataclass From e3329233566e1917f82599f5f01c66a977d1a1e3 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 7 Feb 2023 22:25:37 -0800 Subject: [PATCH 2/5] fix tests Signed-off-by: Eric Liang --- python/ray/data/tests/test_dataset.py | 14 +++++++++++--- .../ray/data/tests/test_streaming_integration.py | 2 +- python/ray/train/tests/test_base_trainer.py | 4 ++++ 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 1c0c4b748ef4..0f7d7db8d3ae 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2381,7 +2381,10 @@ def test_select_columns(ray_start_regular_shared): ds3.select_columns(cols=[]).fully_executed() -def test_map_batches_basic(ray_start_regular_shared, tmp_path): +def test_map_batches_basic(ray_start_regular_shared, tmp_path, restore_dataset_context): + ctx = DatasetContext.get_current() + ctx.execution_options.preserve_order = True + # Test input validation ds = ray.data.range(5) with pytest.raises(ValueError): @@ -2710,8 +2713,11 @@ def test_map_batches_actors_preserves_order(ray_start_regular_shared): ], ) def test_map_batches_batch_mutation( - ray_start_regular_shared, num_rows, num_blocks, batch_size + ray_start_regular_shared, num_rows, num_blocks, batch_size, restore_dataset_context ): + ctx = DatasetContext.get_current() + ctx.execution_options.preserve_order = True + # Test that batch mutation works without encountering a read-only error (e.g. if the # batch is a zero-copy view on data in the object store). def mutate(df): @@ -4832,7 +4838,9 @@ def test_random_block_order_schema(ray_start_regular_shared): ds.schema().names == ["a", "b"] -def test_random_block_order(ray_start_regular_shared): +def test_random_block_order(ray_start_regular_shared, restore_dataset_context): + ctx = DatasetContext.get_current() + ctx.execution_options.preserve_order = True # Test BlockList.randomize_block_order. ds = ray.data.range(12).repartition(4) diff --git a/python/ray/data/tests/test_streaming_integration.py b/python/ray/data/tests/test_streaming_integration.py index f67ebeaae0a9..82d5ab194d92 100644 --- a/python/ray/data/tests/test_streaming_integration.py +++ b/python/ray/data/tests/test_streaming_integration.py @@ -38,7 +38,7 @@ def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]: def test_pipelined_execution(ray_start_10_cpus_shared): - executor = StreamingExecutor(ExecutionOptions()) + executor = StreamingExecutor(ExecutionOptions(preserve_order=True)) inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) o2 = MapOperator.create(make_transform(lambda block: [b * -1 for b in block]), o1) diff --git a/python/ray/train/tests/test_base_trainer.py b/python/ray/train/tests/test_base_trainer.py index f864f1a47924..a355d9852bac 100644 --- a/python/ray/train/tests/test_base_trainer.py +++ b/python/ray/train/tests/test_base_trainer.py @@ -14,6 +14,7 @@ from ray.air import session from ray.air.checkpoint import Checkpoint from ray.air.constants import MAX_REPR_LENGTH +from ray.data.context import DatasetContext from ray.data.preprocessor import Preprocessor from ray.data.preprocessors import BatchMapper from ray.tune.impl import tuner_internal @@ -97,6 +98,9 @@ def training_loop(self): def test_preprocess_datasets(ray_start_4_cpus): + ctx = DatasetContext.get_current() + ctx.execution_options.preserve_order = True + def training_loop(self): assert self.datasets["my_dataset"].take() == [2, 3, 4] From 1cdf6d2e259d5237056aa8d331154ec8a1c3b72d Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 7 Feb 2023 22:27:28 -0800 Subject: [PATCH 3/5] fix doctest Signed-off-by: Eric Liang --- python/ray/data/grouped_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/grouped_dataset.py b/python/ray/data/grouped_dataset.py index dd27136880d3..01a421eb9712 100644 --- a/python/ray/data/grouped_dataset.py +++ b/python/ray/data/grouped_dataset.py @@ -152,7 +152,7 @@ def aggregate(self, *aggs: AggregateFn) -> Dataset[U]: init=lambda k: [], accumulate_row=lambda a, r: a + [r], merge=lambda a1, a2: a1 + a2, - finalize=lambda a: a + finalize=lambda a: sorted(a) )) result.show() From 5ca21abab10385cfcf614f8521d340262f1d6ba2 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 8 Feb 2023 10:49:15 -0800 Subject: [PATCH 4/5] fix doctest Signed-off-by: Eric Liang --- python/ray/data/dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 8b6fc9c5bed6..4efc639c1557 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -3633,10 +3633,10 @@ def repeat(self, times: Optional[int] = None) -> "DatasetPipeline[T]": Examples: >>> import ray >>> # Infinite pipeline of numbers [0, 5) - >>> ray.data.range(5).repeat().take() + >>> ray.data.range(5, parallelism=1).repeat().take() [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, ...] >>> # Can apply transformations to the pipeline. - >>> ray.data.range(5).repeat().map(lambda x: -x).take() + >>> ray.data.range(5, parallelism=1).repeat().map(lambda x: -x).take() [0, -1, -2, -3, -4, 0, -1, -2, -3, -4, ...] >>> # Can shuffle each epoch (dataset) in the pipeline. >>> ray.data.range(5).repeat().random_shuffle().take() # doctest: +SKIP From dae7b2f183bf702e7a274260e67321755d4eb98a Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 8 Feb 2023 10:50:26 -0800 Subject: [PATCH 5/5] update comment Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/interfaces.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 33eac9bae29c..bae2fc205ff8 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -150,8 +150,7 @@ class ExecutionOptions: # node (node driving the execution). locality_with_output: bool = False - # Always preserve ordering of blocks, even if using operators that - # don't require it. + # Set this to preserve the ordering between blocks processed by operators. preserve_order: bool = False