diff --git a/.buildkite/pipeline.ml.yml b/.buildkite/pipeline.ml.yml index e1af5264c41e..e617188bab45 100644 --- a/.buildkite/pipeline.ml.yml +++ b/.buildkite/pipeline.ml.yml @@ -284,7 +284,7 @@ - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/dask/... - label: ":potable_water: Dataset datasource integration tests (Python 3.7)" - conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED"] + conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT - DATA_PROCESSING_TESTING=1 ARROW_VERSION=9.* ARROW_MONGO_VERSION=0.5.* ./ci/env/install-dependencies.sh diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index 272c469c793c..3d44b8e2f614 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -99,8 +99,8 @@ def ray_deps_setup(): name = "com_github_antirez_redis", build_file = "@com_github_ray_project_ray//bazel:BUILD.redis", patch_args = ["-p1"], - url = "https://github.com/redis/redis/archive/refs/tags/7.0.5.tar.gz", - sha256 = "40827fcaf188456ad9b3be8e27a4f403c43672b6bb6201192dc15756af6f1eae", + url = "https://github.com/redis/redis/archive/refs/tags/7.0.8.tar.gz", + sha256 = "0e439cbc19f6db5a4c63d355519ab73bf6ac2ecd47df806c14b19564b3d0c593", patches = [ "@com_github_ray_project_ray//thirdparty/patches:redis-quiet.patch", ], diff --git a/doc/source/ray-air/doc_code/hf_trainer.py b/doc/source/ray-air/doc_code/hf_trainer.py index bbd54c3bb336..1d81d36dc35c 100644 --- a/doc/source/ray-air/doc_code/hf_trainer.py +++ b/doc/source/ray-air/doc_code/hf_trainer.py @@ -12,6 +12,10 @@ from ray.train.huggingface import HuggingFaceTrainer from ray.air.config import ScalingConfig + +# If using GPUs, set this to True. +use_gpu = False + model_checkpoint = "gpt2" tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer" block_size = 128 @@ -66,7 +70,7 @@ def trainer_init_per_worker(train_dataset, eval_dataset, **config): logging_strategy="epoch", learning_rate=2e-5, weight_decay=0.01, - no_cuda=True, # Set to False for GPU training + no_cuda=(not use_gpu), ) return transformers.Trainer( model=model, @@ -76,9 +80,7 @@ def trainer_init_per_worker(train_dataset, eval_dataset, **config): ) -scaling_config = ScalingConfig(num_workers=3) -# If using GPUs, use the below scaling config instead. -# scaling_config = ScalingConfig(num_workers=3, use_gpu=True) +scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu) trainer = HuggingFaceTrainer( trainer_init_per_worker=trainer_init_per_worker, scaling_config=scaling_config, diff --git a/doc/source/ray-air/doc_code/hvd_trainer.py b/doc/source/ray-air/doc_code/hvd_trainer.py index eff640acb4ff..7c4e4fd67c42 100644 --- a/doc/source/ray-air/doc_code/hvd_trainer.py +++ b/doc/source/ray-air/doc_code/hvd_trainer.py @@ -8,6 +8,10 @@ from ray.train.horovod import HorovodTrainer from ray.air.config import ScalingConfig +# If using GPUs, set this to True. +use_gpu = False + + input_size = 1 layer_size = 15 output_size = 1 @@ -43,7 +47,7 @@ def train_loop_per_worker(): for epoch in range(num_epochs): model.train() for batch in dataset_shard.iter_torch_batches( - batch_size=32, dtypes=torch.float + batch_size=32, dtypes=torch.float, device=train.torch.get_device() ): inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"] inputs.to(device) @@ -61,9 +65,7 @@ def train_loop_per_worker(): train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)]) -scaling_config = ScalingConfig(num_workers=3) -# If using GPUs, use the below scaling config instead. -# scaling_config = ScalingConfig(num_workers=3, use_gpu=True) +scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu) trainer = HorovodTrainer( train_loop_per_worker=train_loop_per_worker, scaling_config=scaling_config, diff --git a/doc/source/ray-air/doc_code/tf_starter.py b/doc/source/ray-air/doc_code/tf_starter.py index 02f99cd4bedb..53f39953b40f 100644 --- a/doc/source/ray-air/doc_code/tf_starter.py +++ b/doc/source/ray-air/doc_code/tf_starter.py @@ -10,6 +10,10 @@ from ray.train.tensorflow import TensorflowTrainer from ray.air.config import ScalingConfig + +# If using GPUs, set this to True. +use_gpu = False + a = 5 b = 10 size = 100 @@ -59,9 +63,7 @@ def train_func(config: dict): train_dataset = ray.data.from_items( [{"x": x / 200, "y": 2 * x / 200} for x in range(200)] ) -scaling_config = ScalingConfig(num_workers=2) -# If using GPUs, use the below scaling config instead. -# scaling_config = ScalingConfig(num_workers=2, use_gpu=True) +scaling_config = ScalingConfig(num_workers=2, use_gpu=use_gpu) trainer = TensorflowTrainer( train_loop_per_worker=train_func, train_loop_config=config, diff --git a/doc/source/ray-air/doc_code/torch_trainer.py b/doc/source/ray-air/doc_code/torch_trainer.py index 486f6553d21a..f8d4dc45bcfe 100644 --- a/doc/source/ray-air/doc_code/torch_trainer.py +++ b/doc/source/ray-air/doc_code/torch_trainer.py @@ -7,6 +7,11 @@ from ray.train.torch import TorchTrainer from ray.air.config import ScalingConfig + +# If using GPUs, set this to True. +use_gpu = False + + input_size = 1 layer_size = 15 output_size = 1 @@ -34,7 +39,7 @@ def train_loop_per_worker(): for epoch in range(num_epochs): for batches in dataset_shard.iter_torch_batches( - batch_size=32, dtypes=torch.float + batch_size=32, dtypes=torch.float, device=train.torch.get_device() ): inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"] output = model(inputs) @@ -53,9 +58,7 @@ def train_loop_per_worker(): train_dataset = ray.data.from_items([{"x": x, "y": 2 * x + 1} for x in range(200)]) -scaling_config = ScalingConfig(num_workers=3) -# If using GPUs, use the below scaling config instead. -# scaling_config = ScalingConfig(num_workers=3, use_gpu=True) +scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu) trainer = TorchTrainer( train_loop_per_worker=train_loop_per_worker, scaling_config=scaling_config, diff --git a/python/ray/data/_internal/execution/operators/actor_pool_submitter.py b/python/ray/data/_internal/execution/operators/actor_pool_submitter.py index 23727e907031..adcf95990b93 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_submitter.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_submitter.py @@ -3,6 +3,9 @@ from ray.data.block import Block, BlockMetadata from ray.data.context import DatasetContext from ray.data.context import DEFAULT_SCHEDULING_STRATEGY +from ray.data._internal.execution.interfaces import ( + ExecutionOptions, +) from ray.data._internal.execution.operators.map_task_submitter import ( MapTaskSubmitter, _map_task, @@ -28,8 +31,7 @@ def __init__( ray_remote_args: Remote arguments for the Ray actors to be created. pool_size: The size of the actor pool. """ - self._transform_fn_ref = transform_fn_ref - self._ray_remote_args = ray_remote_args + super().__init__(transform_fn_ref, ray_remote_args) self._pool_size = pool_size # A map from task output futures to the actors on which they are running. self._active_actors: Dict[ObjectRef[Block], ray.actor.ActorHandle] = {} @@ -39,7 +41,8 @@ def __init__( def progress_str(self) -> str: return f"{self._actor_pool.size()} actors" - def start(self): + def start(self, options: ExecutionOptions): + super().start(options) # Create the actor workers and add them to the pool. ray_remote_args = self._apply_default_remote_args(self._ray_remote_args) cls_ = ray.remote(**ray_remote_args)(MapWorker) diff --git a/python/ray/data/_internal/execution/operators/map_operator_state.py b/python/ray/data/_internal/execution/operators/map_operator_state.py index d896ee7d9867..462d0024802a 100644 --- a/python/ray/data/_internal/execution/operators/map_operator_state.py +++ b/python/ray/data/_internal/execution/operators/map_operator_state.py @@ -76,7 +76,7 @@ def __init__( self._output_queue: Optional[_OutputQueue] = None def start(self, options: ExecutionOptions) -> None: - self._task_submitter.start() + self._task_submitter.start(options) if options.preserve_order: self._output_queue = _OrderedOutputQueue() else: diff --git a/python/ray/data/_internal/execution/operators/map_task_submitter.py b/python/ray/data/_internal/execution/operators/map_task_submitter.py index 1adb9478e06b..88b17a97cedf 100644 --- a/python/ray/data/_internal/execution/operators/map_task_submitter.py +++ b/python/ray/data/_internal/execution/operators/map_task_submitter.py @@ -1,7 +1,13 @@ from abc import ABC, abstractmethod -from typing import List, Union, Tuple, Callable, Iterator +from typing import Dict, Any, List, Union, Tuple, Callable, Iterator + +import ray from ray.data.block import Block, BlockAccessor, BlockMetadata, BlockExecStats +from ray.data._internal.execution.interfaces import ( + ExecutionOptions, +) from ray.types import ObjectRef +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from ray._raylet import ObjectRefGenerator @@ -13,14 +19,35 @@ class MapTaskSubmitter(ABC): submission is done. """ - def start(self): + def __init__( + self, + transform_fn_ref: ObjectRef[Callable[[Iterator[Block]], Iterator[Block]]], + ray_remote_args: Dict[str, Any], + ): + """Create a TaskPoolSubmitter instance. + + Args: + transform_fn_ref: The function to apply to a block bundle in the submitted + map task. + ray_remote_args: Remote arguments for the Ray tasks to be launched. + """ + self._transform_fn_ref = transform_fn_ref + self._ray_remote_args = ray_remote_args + + def start(self, options: ExecutionOptions): """Start the task submitter so it's ready to submit tasks. This is called when execution of the map operator actually starts, and is where the submitter can initialize expensive state, reserve resources, start workers, etc. """ - pass + if options.locality_with_output: + self._ray_remote_args[ + "scheduling_strategy" + ] = NodeAffinitySchedulingStrategy( + ray.get_runtime_context().get_node_id(), + soft=True, + ) @abstractmethod def submit( diff --git a/python/ray/data/_internal/execution/operators/task_pool_submitter.py b/python/ray/data/_internal/execution/operators/task_pool_submitter.py index b5aba33b1852..b850c22be71d 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_submitter.py +++ b/python/ray/data/_internal/execution/operators/task_pool_submitter.py @@ -1,4 +1,4 @@ -from typing import Dict, Any, Iterator, Callable, Union, List +from typing import Union, List import ray from ray.data.block import Block @@ -14,21 +14,6 @@ class TaskPoolSubmitter(MapTaskSubmitter): """A task submitter for MapOperator that uses normal Ray tasks.""" - def __init__( - self, - transform_fn_ref: ObjectRef[Callable[[Iterator[Block]], Iterator[Block]]], - ray_remote_args: Dict[str, Any], - ): - """Create a TaskPoolSubmitter instance. - - Args: - transform_fn_ref: The function to apply to a block bundle in the submitted - map task. - ray_remote_args: Remote arguments for the Ray tasks to be launched. - """ - self._transform_fn_ref = transform_fn_ref - self._ray_remote_args = ray_remote_args - def submit( self, input_blocks: List[ObjectRef[Block]] ) -> ObjectRef[ObjectRefGenerator]: diff --git a/python/ray/data/_internal/logical/operators/map_operator.py b/python/ray/data/_internal/logical/operators/map_operator.py index 2c3c23a407c6..5ecddcd9d32a 100644 --- a/python/ray/data/_internal/logical/operators/map_operator.py +++ b/python/ray/data/_internal/logical/operators/map_operator.py @@ -7,13 +7,14 @@ from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.logical.interfaces import LogicalOperator from ray.data._internal.compute import ( + UDF, get_compute, CallableClass, ComputeStrategy, TaskPoolStrategy, ActorPoolStrategy, ) -from ray.data.block import BatchUDF, Block +from ray.data.block import BatchUDF, Block, RowUDF if sys.version_info >= (3, 8): @@ -22,33 +23,30 @@ from typing_extensions import Literal -class MapBatches(LogicalOperator): - """Logical operator for map_batches.""" +class AbstractMap(LogicalOperator): + """Abstract class for logical operators should be converted to physical + MapOperator. + """ def __init__( self, + name: str, input_op: LogicalOperator, block_fn: BlockTransform, - fn: BatchUDF, - batch_size: Optional[Union[int, Literal["default"]]] = "default", compute: Optional[Union[str, ComputeStrategy]] = None, - batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default", - zero_copy_batch: bool = False, target_block_size: Optional[int] = None, + fn: Optional[UDF] = None, fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, fn_constructor_kwargs: Optional[Dict[str, Any]] = None, ray_remote_args: Optional[Dict[str, Any]] = None, ): - super().__init__("MapBatches", [input_op]) + super().__init__(name, [input_op]) self._block_fn = block_fn - self._fn = fn - self._batch_size = batch_size self._compute = compute or "tasks" - self._batch_format = batch_format - self._zero_copy_batch = zero_copy_batch self._target_block_size = target_block_size + self._fn = fn self._fn_args = fn_args self._fn_kwargs = fn_kwargs self._fn_constructor_args = fn_constructor_args @@ -56,10 +54,66 @@ def __init__( self._ray_remote_args = ray_remote_args or {} -def plan_map_batches_op( - op: MapBatches, input_physical_dag: PhysicalOperator -) -> PhysicalOperator: - """Get the corresponding DAG of physical operators for MapBatches.""" +class MapBatches(AbstractMap): + """Logical operator for map_batches.""" + + def __init__( + self, + input_op: LogicalOperator, + block_fn: BlockTransform, + fn: BatchUDF, + batch_size: Optional[Union[int, Literal["default"]]] = "default", + compute: Optional[Union[str, ComputeStrategy]] = None, + batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default", + zero_copy_batch: bool = False, + target_block_size: Optional[int] = None, + fn_args: Optional[Iterable[Any]] = None, + fn_kwargs: Optional[Dict[str, Any]] = None, + fn_constructor_args: Optional[Iterable[Any]] = None, + fn_constructor_kwargs: Optional[Dict[str, Any]] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + ): + super().__init__( + "MapBatches", + input_op, + block_fn, + compute=compute, + target_block_size=target_block_size, + fn=fn, + fn_args=fn_args, + fn_kwargs=fn_kwargs, + fn_constructor_args=fn_constructor_args, + fn_constructor_kwargs=fn_constructor_kwargs, + ray_remote_args=ray_remote_args, + ) + self._batch_size = batch_size + self._batch_format = batch_format + self._zero_copy_batch = zero_copy_batch + + +class MapRows(AbstractMap): + """Logical operator for map.""" + + def __init__( + self, + input_op: LogicalOperator, + block_fn: BlockTransform, + fn: RowUDF, + compute: Optional[Union[str, ComputeStrategy]] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + ): + super().__init__( + "MapRows", + input_op, + block_fn, + compute=compute, + fn=fn, + ray_remote_args=ray_remote_args, + ) + + +def plan_map_op(op: AbstractMap, input_physical_dag: PhysicalOperator) -> MapOperator: + """Get the corresponding physical operators DAG for AbstractMap operators.""" compute = get_compute(op._compute) block_fn = op._block_fn diff --git a/python/ray/data/_internal/logical/planner.py b/python/ray/data/_internal/logical/planner.py index 87954725b6e6..eec4cd440551 100644 --- a/python/ray/data/_internal/logical/planner.py +++ b/python/ray/data/_internal/logical/planner.py @@ -2,8 +2,8 @@ from ray.data._internal.logical.interfaces import LogicalOperator from ray.data._internal.logical.operators.read_operator import Read, plan_read_op from ray.data._internal.logical.operators.map_operator import ( - MapBatches, - plan_map_batches_op, + AbstractMap, + plan_map_op, ) @@ -24,9 +24,9 @@ def plan(self, logical_dag: LogicalOperator) -> PhysicalOperator: if isinstance(logical_dag, Read): assert not physical_children physical_dag = plan_read_op(logical_dag) - elif isinstance(logical_dag, MapBatches): + elif isinstance(logical_dag, AbstractMap): assert len(physical_children) == 1 - physical_dag = plan_map_batches_op(logical_dag, physical_children[0]) + physical_dag = plan_map_op(logical_dag, physical_children[0]) else: raise ValueError( f"Found unknown logical operator during planning: {logical_dag}" diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 1741d61d9da1..3bc492e944e9 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -449,12 +449,11 @@ def execute_to_iterator( ) from ray.data._internal.execution.streaming_executor import StreamingExecutor - from ray.data._internal.execution.interfaces import ExecutionOptions from ray.data._internal.execution.legacy_compat import ( execute_to_legacy_block_iterator, ) - executor = StreamingExecutor(ExecutionOptions(preserve_order=False)) + executor = StreamingExecutor(copy.deepcopy(ctx.execution_options)) block_iter = execute_to_legacy_block_iterator( executor, self, @@ -500,12 +499,11 @@ def execute( if not self.has_computed_output(): if self._run_with_new_execution_backend(): from ray.data._internal.execution.bulk_executor import BulkExecutor - from ray.data._internal.execution.interfaces import ExecutionOptions from ray.data._internal.execution.legacy_compat import ( execute_to_legacy_block_list, ) - executor = BulkExecutor(ExecutionOptions()) + executor = BulkExecutor(copy.deepcopy(context.execution_options)) blocks = execute_to_legacy_block_list( executor, self, diff --git a/python/ray/data/context.py b/python/ray/data/context.py index 1b735bdaf406..cb953f04696a 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -1,10 +1,13 @@ import os import threading -from typing import Optional +from typing import Optional, TYPE_CHECKING from ray.util.annotations import DeveloperAPI from ray.util.scheduling_strategies import SchedulingStrategyT +if TYPE_CHECKING: + from ray.data._internal.execution.interfaces import ExecutionOptions + # The context singleton on this process. _default_context: "Optional[DatasetContext]" = None _context_lock = threading.Lock() @@ -144,6 +147,7 @@ def __init__( enable_auto_log_stats: bool, trace_allocations: bool, optimizer_enabled: bool, + execution_options: "ExecutionOptions", ): """Private constructor (use get_current() instead).""" self.block_splitting_enabled = block_splitting_enabled @@ -171,6 +175,8 @@ def __init__( self.enable_auto_log_stats = enable_auto_log_stats self.trace_allocations = trace_allocations self.optimizer_enabled = optimizer_enabled + # TODO: expose execution options in Dataset public APIs. + self.execution_options = execution_options @staticmethod def get_current() -> "DatasetContext": @@ -179,6 +185,8 @@ def get_current() -> "DatasetContext": If the context has not yet been created in this process, it will be initialized with default settings. """ + from ray.data._internal.execution.interfaces import ExecutionOptions + global _default_context with _context_lock: @@ -213,6 +221,7 @@ def get_current() -> "DatasetContext": enable_auto_log_stats=DEFAULT_AUTO_LOG_STATS, trace_allocations=DEFAULT_TRACE_ALLOCATIONS, optimizer_enabled=DEFAULT_OPTIMIZER_ENABLED, + execution_options=ExecutionOptions(), ) return _default_context diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index b21ea92a7e54..d2f146bf10aa 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -31,7 +31,10 @@ from ray.air.constants import TENSOR_COLUMN_NAME from ray.air.util.data_batch_conversion import BlockFormat from ray.data._internal.logical.optimizers import LogicalPlan -from ray.data._internal.logical.operators.map_operator import MapBatches +from ray.data._internal.logical.operators.map_operator import ( + MapRows, + MapBatches, +) from ray.data.dataset_iterator import DatasetIterator from ray.data._internal.block_batching import batch_block_refs, batch_blocks from ray.data._internal.block_list import BlockList @@ -334,7 +337,18 @@ def transform(block: Block, fn: RowUDF[T, U]) -> Iterable[Block]: fn=fn, ) ) - return Dataset(plan, self._epoch, self._lazy) + + logical_plan = self._logical_plan + if logical_plan is not None: + map_op = MapRows( + logical_plan.dag, + transform, + fn, + compute=compute, + ray_remote_args=ray_remote_args, + ) + logical_plan = LogicalPlan(map_op) + return Dataset(plan, self._epoch, self._lazy, logical_plan) def map_batches( self, @@ -676,16 +690,16 @@ def process_next_batch(batch: DataBatch) -> Iterator[Block]: logical_plan.dag, transform, fn, - batch_size, - compute, - batch_format, - zero_copy_batch, - target_block_size, - fn_args, - fn_kwargs, - fn_constructor_args, - fn_constructor_kwargs, - ray_remote_args, + batch_size=batch_size, + compute=compute, + batch_format=batch_format, + zero_copy_batch=zero_copy_batch, + target_block_size=target_block_size, + fn_args=fn_args, + fn_kwargs=fn_kwargs, + fn_constructor_args=fn_constructor_args, + fn_constructor_kwargs=fn_constructor_kwargs, + ray_remote_args=ray_remote_args, ) logical_plan = LogicalPlan(map_batches_op) diff --git a/python/ray/data/datasource/mongo_datasource.py b/python/ray/data/datasource/mongo_datasource.py index d44dec57a667..f1153271c532 100644 --- a/python/ray/data/datasource/mongo_datasource.py +++ b/python/ray/data/datasource/mongo_datasource.py @@ -95,6 +95,10 @@ def __init__( self._client = pymongo.MongoClient(uri) _validate_database_collection_exist(self._client, database, collection) + self._avg_obj_size = self._client[database].command("collstats", collection)[ + "avgObjSize" + ] + def estimate_inmemory_data_size(self) -> Optional[int]: # TODO(jian): Add memory size estimation to improve auto-tune of parallelism. return None @@ -154,7 +158,7 @@ def make_block( for i, partition in enumerate(partitions_ids): metadata = BlockMetadata( num_rows=partition["count"], - size_bytes=None, + size_bytes=partition["count"] * self._avg_obj_size, schema=None, input_files=None, exec_stats=None, diff --git a/python/ray/data/tests/conftest.py b/python/ray/data/tests/conftest.py index 8decea0190ea..7d013d9f84eb 100644 --- a/python/ray/data/tests/conftest.py +++ b/python/ray/data/tests/conftest.py @@ -302,6 +302,18 @@ def target_max_block_size(request): ctx.target_max_block_size = original +@pytest.fixture(params=[True]) +def enable_optimizer(request): + ctx = ray.data.context.DatasetContext.get_current() + original_backend = ctx.new_execution_backend + original_optimizer = ctx.optimizer_enabled + ctx.new_execution_backend = request.param + ctx.optimizer_enabled = request.param + yield request.param + ctx.new_execution_backend = original_backend + ctx.optimizer_enabled = original_optimizer + + # ===== Pandas dataset formats ===== @pytest.fixture(scope="function") def ds_pandas_single_column_format(ray_start_regular_shared): diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index 411c88b2a419..34596fc82ca4 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -1,15 +1,64 @@ import pytest import ray -from ray.data.context import DatasetContext +from ray.data._internal.execution.operators.map_operator import MapOperator +from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer +from ray.data._internal.logical.operators.read_operator import Read +from ray.data._internal.logical.operators.map_operator import MapRows, MapBatches +from ray.data._internal.logical.planner import Planner +from ray.data.datasource.parquet_datasource import ParquetDatasource +from ray.tests.conftest import * # noqa -def test_e2e_optimizer_sanity(ray_start_cluster_enabled): - ctx = DatasetContext.get_current() - ctx.new_execution_backend = True - ctx.optimizer_enabled = True - ds = ray.data.range(5).map_batches(lambda x: x) - assert ds.take_all() == [0, 1, 2, 3, 4], ds + +def test_e2e_optimizer_sanity(ray_start_cluster_enabled, enable_optimizer): + ds = ray.data.range(5) + ds = ds.map_batches(lambda x: x) + ds = ds.map(lambda x: x + 1) + assert ds.take_all() == [1, 2, 3, 4, 5], ds + + +def test_read_operator(ray_start_cluster_enabled, enable_optimizer): + planner = Planner() + op = Read(ParquetDatasource()) + physical_op = planner.plan(op) + + assert op.name == "Read" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) + + +def test_map_batches_operator(ray_start_cluster_enabled, enable_optimizer): + planner = Planner() + read_op = Read(ParquetDatasource()) + op = MapBatches( + read_op, + lambda it: (x for x in it), + lambda x: x, + ) + physical_op = planner.plan(op) + + assert op.name == "MapBatches" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + assert isinstance(physical_op.input_dependencies[0], MapOperator) + + +def test_map_rows_operator(ray_start_cluster_enabled, enable_optimizer): + planner = Planner() + read_op = Read(ParquetDatasource()) + op = MapRows( + read_op, + lambda it: (x for x in it), + lambda x: x, + ) + physical_op = planner.plan(op) + + assert op.name == "MapRows" + assert isinstance(physical_op, MapOperator) + assert len(physical_op.input_dependencies) == 1 + assert isinstance(physical_op.input_dependencies[0], MapOperator) if __name__ == "__main__": diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index 27095867617f..5b8f8f29b77d 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -5,6 +5,7 @@ from typing import List, Any import ray +from ray.data.context import DatasetContext from ray.data._internal.execution.interfaces import ( ExecutionOptions, ExecutionResources, @@ -27,6 +28,7 @@ from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.util import make_ref_bundles +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy @ray.remote @@ -291,6 +293,75 @@ def reverse_sort(inputs: List[RefBundle]): assert output == expected, (output, expected) +def test_e2e_option_propagation(): + DatasetContext.get_current().new_execution_backend = True + DatasetContext.get_current().use_streaming_executor = True + + def run(): + ray.data.range(5, parallelism=5).map( + lambda x: x, compute=ray.data.ActorPoolStrategy(2, 2) + ).take_all() + + DatasetContext.get_current().execution_options.resource_limits = ( + ExecutionResources() + ) + run() + + DatasetContext.get_current().execution_options.resource_limits.cpu = 1 + with pytest.raises(ValueError): + run() + + +def test_configure_spread_e2e(): + from ray import remote_function + + tasks = [] + + def _test_hook(fn, args, strategy): + if "map_task" in str(fn): + tasks.append(strategy) + + remote_function._task_launch_hook = _test_hook + DatasetContext.get_current().use_streaming_executor = True + DatasetContext.get_current().execution_options.preserve_order = True + + # Simple 2-stage pipeline. + ray.data.range(2, parallelism=2).map(lambda x: x, num_cpus=2).take_all() + + # Read tasks get SPREAD by default, subsequent ones use default policy. + tasks = sorted(tasks) + assert tasks == ["DEFAULT", "DEFAULT", "SPREAD", "SPREAD"] + + +def test_configure_output_locality(): + inputs = make_ref_bundles([[x] for x in range(20)]) + o1 = InputDataBuffer(inputs) + o2 = MapOperator(make_transform(lambda block: [b * -1 for b in block]), o1) + o3 = MapOperator( + make_transform(lambda block: [b * 2 for b in block]), + o2, + compute_strategy=ray.data.ActorPoolStrategy(1, 1), + ) + topo, _ = build_streaming_topology(o3, ExecutionOptions(locality_with_output=False)) + assert ( + o2._execution_state._task_submitter._ray_remote_args.get("scheduling_strategy") + is None + ) + assert ( + o3._execution_state._task_submitter._ray_remote_args.get("scheduling_strategy") + is None + ) + topo, _ = build_streaming_topology(o3, ExecutionOptions(locality_with_output=True)) + assert isinstance( + o2._execution_state._task_submitter._ray_remote_args["scheduling_strategy"], + NodeAffinitySchedulingStrategy, + ) + assert isinstance( + o3._execution_state._task_submitter._ray_remote_args["scheduling_strategy"], + NodeAffinitySchedulingStrategy, + ) + + if __name__ == "__main__": import sys diff --git a/python/ray/train/horovod/horovod_trainer.py b/python/ray/train/horovod/horovod_trainer.py index c4a4c957cc6b..d148e6775fae 100644 --- a/python/ray/train/horovod/horovod_trainer.py +++ b/python/ray/train/horovod/horovod_trainer.py @@ -92,6 +92,9 @@ def train_loop_per_worker(): from ray.train.torch import TorchCheckpoint from ray.air.config import ScalingConfig + # If using GPUs, set this to True. + use_gpu = False + input_size = 1 layer_size = 15 output_size = 1 @@ -124,7 +127,7 @@ def train_loop_per_worker(): for epoch in range(num_epochs): model.train() for batch in dataset_shard.iter_torch_batches( - batch_size=32, dtypes=torch.float + batch_size=32, dtypes=torch.float, device=train.torch.get_device() ): inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"] inputs.to(device) @@ -142,9 +145,7 @@ def train_loop_per_worker(): ), ) train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)]) - scaling_config = ScalingConfig(num_workers=3) - # If using GPUs, use the below scaling config instead. - # scaling_config = ScalingConfig(num_workers=3, use_gpu=True) + scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu) trainer = HorovodTrainer( train_loop_per_worker=train_loop_per_worker, scaling_config=scaling_config, diff --git a/python/ray/train/huggingface/huggingface_trainer.py b/python/ray/train/huggingface/huggingface_trainer.py index 8afe9c2784b0..5e7a145c788f 100644 --- a/python/ray/train/huggingface/huggingface_trainer.py +++ b/python/ray/train/huggingface/huggingface_trainer.py @@ -124,6 +124,9 @@ class HuggingFaceTrainer(TorchTrainer): from ray.train.huggingface import HuggingFaceTrainer from ray.air.config import ScalingConfig + # If using GPUs, set this to True. + use_gpu = False + model_checkpoint = "gpt2" tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer" block_size = 128 @@ -180,6 +183,7 @@ def trainer_init_per_worker(train_dataset, eval_dataset, **config): logging_strategy="epoch", learning_rate=2e-5, weight_decay=0.01, + no_cuda=(not use_gpu), ) return transformers.Trainer( model=model, @@ -188,9 +192,7 @@ def trainer_init_per_worker(train_dataset, eval_dataset, **config): eval_dataset=eval_dataset, ) - scaling_config = ScalingConfig(num_workers=3) - # If using GPUs, use the below scaling config instead. - # scaling_config = ScalingConfig(num_workers=3, use_gpu=True) + scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu) trainer = HuggingFaceTrainer( trainer_init_per_worker=trainer_init_per_worker, scaling_config=scaling_config, diff --git a/python/ray/train/tensorflow/tensorflow_trainer.py b/python/ray/train/tensorflow/tensorflow_trainer.py index 44a754647739..6b2ef8609df1 100644 --- a/python/ray/train/tensorflow/tensorflow_trainer.py +++ b/python/ray/train/tensorflow/tensorflow_trainer.py @@ -94,6 +94,9 @@ def train_loop_per_worker(): from ray.air.config import ScalingConfig from ray.train.tensorflow import TensorflowTrainer + # If using GPUs, set this to True. + use_gpu = False + def build_model(): # toy neural network : 1-layer return tf.keras.Sequential( @@ -128,7 +131,7 @@ def train_loop_per_worker(config): train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)]) trainer = TensorflowTrainer( train_loop_per_worker=train_loop_per_worker, - scaling_config=ScalingConfig(num_workers=3), + scaling_config=ScalingConfig(num_workers=3, use_gpu=use_gpu), datasets={"train": train_dataset}, train_loop_config={"num_epochs": 2}, ) diff --git a/python/ray/train/torch/torch_trainer.py b/python/ray/train/torch/torch_trainer.py index a21a6a93adb0..7a1987a77315 100644 --- a/python/ray/train/torch/torch_trainer.py +++ b/python/ray/train/torch/torch_trainer.py @@ -142,6 +142,9 @@ def train_func(): from ray.air.config import RunConfig from ray.air.config import CheckpointConfig + # If using GPUs, set this to True. + use_gpu = False + # Define NN layers archicture, epochs, and number of workers input_size = 1 layer_size = 32 @@ -179,7 +182,7 @@ def train_loop_per_worker(): # Iterate over epochs and batches for epoch in range(num_epochs): for batches in dataset_shard.iter_torch_batches(batch_size=32, - dtypes=torch.float): + dtypes=torch.float, device=train.torch.get_device()): # Add batch or unsqueeze as an additional dimension [32, x] inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"] @@ -210,9 +213,7 @@ def train_loop_per_worker(): ) # Define scaling and run configs - # If using GPUs, use the below scaling config instead. - # scaling_config = ScalingConfig(num_workers=3, use_gpu=True) - scaling_config = ScalingConfig(num_workers=num_workers) + scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu) run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1)) trainer = TorchTrainer(