Skip to content

Commit

Permalink
dfg (#5)
Browse files Browse the repository at this point in the history
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(
  • Loading branch information
jcoffi authored Jan 26, 2023
2 parents b24933d + da79ae9 commit c51f8bf
Show file tree
Hide file tree
Showing 23 changed files with 340 additions and 98 deletions.
2 changes: 1 addition & 1 deletion .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/dask/...

- label: ":potable_water: Dataset datasource integration tests (Python 3.7)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED"]
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- DATA_PROCESSING_TESTING=1 ARROW_VERSION=9.* ARROW_MONGO_VERSION=0.5.* ./ci/env/install-dependencies.sh
Expand Down
4 changes: 2 additions & 2 deletions bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ def ray_deps_setup():
name = "com_github_antirez_redis",
build_file = "@com_github_ray_project_ray//bazel:BUILD.redis",
patch_args = ["-p1"],
url = "https://github.com/redis/redis/archive/refs/tags/7.0.5.tar.gz",
sha256 = "40827fcaf188456ad9b3be8e27a4f403c43672b6bb6201192dc15756af6f1eae",
url = "https://github.com/redis/redis/archive/refs/tags/7.0.8.tar.gz",
sha256 = "0e439cbc19f6db5a4c63d355519ab73bf6ac2ecd47df806c14b19564b3d0c593",
patches = [
"@com_github_ray_project_ray//thirdparty/patches:redis-quiet.patch",
],
Expand Down
10 changes: 6 additions & 4 deletions doc/source/ray-air/doc_code/hf_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from ray.train.huggingface import HuggingFaceTrainer
from ray.air.config import ScalingConfig


# If using GPUs, set this to True.
use_gpu = False

model_checkpoint = "gpt2"
tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"
block_size = 128
Expand Down Expand Up @@ -66,7 +70,7 @@ def trainer_init_per_worker(train_dataset, eval_dataset, **config):
logging_strategy="epoch",
learning_rate=2e-5,
weight_decay=0.01,
no_cuda=True, # Set to False for GPU training
no_cuda=(not use_gpu),
)
return transformers.Trainer(
model=model,
Expand All @@ -76,9 +80,7 @@ def trainer_init_per_worker(train_dataset, eval_dataset, **config):
)


scaling_config = ScalingConfig(num_workers=3)
# If using GPUs, use the below scaling config instead.
# scaling_config = ScalingConfig(num_workers=3, use_gpu=True)
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = HuggingFaceTrainer(
trainer_init_per_worker=trainer_init_per_worker,
scaling_config=scaling_config,
Expand Down
10 changes: 6 additions & 4 deletions doc/source/ray-air/doc_code/hvd_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
from ray.train.horovod import HorovodTrainer
from ray.air.config import ScalingConfig

# If using GPUs, set this to True.
use_gpu = False


input_size = 1
layer_size = 15
output_size = 1
Expand Down Expand Up @@ -43,7 +47,7 @@ def train_loop_per_worker():
for epoch in range(num_epochs):
model.train()
for batch in dataset_shard.iter_torch_batches(
batch_size=32, dtypes=torch.float
batch_size=32, dtypes=torch.float, device=train.torch.get_device()
):
inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
inputs.to(device)
Expand All @@ -61,9 +65,7 @@ def train_loop_per_worker():


train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
scaling_config = ScalingConfig(num_workers=3)
# If using GPUs, use the below scaling config instead.
# scaling_config = ScalingConfig(num_workers=3, use_gpu=True)
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = HorovodTrainer(
train_loop_per_worker=train_loop_per_worker,
scaling_config=scaling_config,
Expand Down
8 changes: 5 additions & 3 deletions doc/source/ray-air/doc_code/tf_starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
from ray.train.tensorflow import TensorflowTrainer
from ray.air.config import ScalingConfig


# If using GPUs, set this to True.
use_gpu = False

a = 5
b = 10
size = 100
Expand Down Expand Up @@ -59,9 +63,7 @@ def train_func(config: dict):
train_dataset = ray.data.from_items(
[{"x": x / 200, "y": 2 * x / 200} for x in range(200)]
)
scaling_config = ScalingConfig(num_workers=2)
# If using GPUs, use the below scaling config instead.
# scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
scaling_config = ScalingConfig(num_workers=2, use_gpu=use_gpu)
trainer = TensorflowTrainer(
train_loop_per_worker=train_func,
train_loop_config=config,
Expand Down
11 changes: 7 additions & 4 deletions doc/source/ray-air/doc_code/torch_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig


# If using GPUs, set this to True.
use_gpu = False


input_size = 1
layer_size = 15
output_size = 1
Expand Down Expand Up @@ -34,7 +39,7 @@ def train_loop_per_worker():

for epoch in range(num_epochs):
for batches in dataset_shard.iter_torch_batches(
batch_size=32, dtypes=torch.float
batch_size=32, dtypes=torch.float, device=train.torch.get_device()
):
inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"]
output = model(inputs)
Expand All @@ -53,9 +58,7 @@ def train_loop_per_worker():


train_dataset = ray.data.from_items([{"x": x, "y": 2 * x + 1} for x in range(200)])
scaling_config = ScalingConfig(num_workers=3)
# If using GPUs, use the below scaling config instead.
# scaling_config = ScalingConfig(num_workers=3, use_gpu=True)
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
scaling_config=scaling_config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from ray.data.block import Block, BlockMetadata
from ray.data.context import DatasetContext
from ray.data.context import DEFAULT_SCHEDULING_STRATEGY
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
)
from ray.data._internal.execution.operators.map_task_submitter import (
MapTaskSubmitter,
_map_task,
Expand All @@ -28,8 +31,7 @@ def __init__(
ray_remote_args: Remote arguments for the Ray actors to be created.
pool_size: The size of the actor pool.
"""
self._transform_fn_ref = transform_fn_ref
self._ray_remote_args = ray_remote_args
super().__init__(transform_fn_ref, ray_remote_args)
self._pool_size = pool_size
# A map from task output futures to the actors on which they are running.
self._active_actors: Dict[ObjectRef[Block], ray.actor.ActorHandle] = {}
Expand All @@ -39,7 +41,8 @@ def __init__(
def progress_str(self) -> str:
return f"{self._actor_pool.size()} actors"

def start(self):
def start(self, options: ExecutionOptions):
super().start(options)
# Create the actor workers and add them to the pool.
ray_remote_args = self._apply_default_remote_args(self._ray_remote_args)
cls_ = ray.remote(**ray_remote_args)(MapWorker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(
self._output_queue: Optional[_OutputQueue] = None

def start(self, options: ExecutionOptions) -> None:
self._task_submitter.start()
self._task_submitter.start(options)
if options.preserve_order:
self._output_queue = _OrderedOutputQueue()
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from abc import ABC, abstractmethod
from typing import List, Union, Tuple, Callable, Iterator
from typing import Dict, Any, List, Union, Tuple, Callable, Iterator

import ray
from ray.data.block import Block, BlockAccessor, BlockMetadata, BlockExecStats
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
)
from ray.types import ObjectRef
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from ray._raylet import ObjectRefGenerator


Expand All @@ -13,14 +19,35 @@ class MapTaskSubmitter(ABC):
submission is done.
"""

def start(self):
def __init__(
self,
transform_fn_ref: ObjectRef[Callable[[Iterator[Block]], Iterator[Block]]],
ray_remote_args: Dict[str, Any],
):
"""Create a TaskPoolSubmitter instance.
Args:
transform_fn_ref: The function to apply to a block bundle in the submitted
map task.
ray_remote_args: Remote arguments for the Ray tasks to be launched.
"""
self._transform_fn_ref = transform_fn_ref
self._ray_remote_args = ray_remote_args

def start(self, options: ExecutionOptions):
"""Start the task submitter so it's ready to submit tasks.
This is called when execution of the map operator actually starts, and is where
the submitter can initialize expensive state, reserve resources, start workers,
etc.
"""
pass
if options.locality_with_output:
self._ray_remote_args[
"scheduling_strategy"
] = NodeAffinitySchedulingStrategy(
ray.get_runtime_context().get_node_id(),
soft=True,
)

@abstractmethod
def submit(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Any, Iterator, Callable, Union, List
from typing import Union, List

import ray
from ray.data.block import Block
Expand All @@ -14,21 +14,6 @@
class TaskPoolSubmitter(MapTaskSubmitter):
"""A task submitter for MapOperator that uses normal Ray tasks."""

def __init__(
self,
transform_fn_ref: ObjectRef[Callable[[Iterator[Block]], Iterator[Block]]],
ray_remote_args: Dict[str, Any],
):
"""Create a TaskPoolSubmitter instance.
Args:
transform_fn_ref: The function to apply to a block bundle in the submitted
map task.
ray_remote_args: Remote arguments for the Ray tasks to be launched.
"""
self._transform_fn_ref = transform_fn_ref
self._ray_remote_args = ray_remote_args

def submit(
self, input_blocks: List[ObjectRef[Block]]
) -> ObjectRef[ObjectRefGenerator]:
Expand Down
86 changes: 70 additions & 16 deletions python/ray/data/_internal/logical/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.compute import (
UDF,
get_compute,
CallableClass,
ComputeStrategy,
TaskPoolStrategy,
ActorPoolStrategy,
)
from ray.data.block import BatchUDF, Block
from ray.data.block import BatchUDF, Block, RowUDF


if sys.version_info >= (3, 8):
Expand All @@ -22,44 +23,97 @@
from typing_extensions import Literal


class MapBatches(LogicalOperator):
"""Logical operator for map_batches."""
class AbstractMap(LogicalOperator):
"""Abstract class for logical operators should be converted to physical
MapOperator.
"""

def __init__(
self,
name: str,
input_op: LogicalOperator,
block_fn: BlockTransform,
fn: BatchUDF,
batch_size: Optional[Union[int, Literal["default"]]] = "default",
compute: Optional[Union[str, ComputeStrategy]] = None,
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
zero_copy_batch: bool = False,
target_block_size: Optional[int] = None,
fn: Optional[UDF] = None,
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__("MapBatches", [input_op])
super().__init__(name, [input_op])
self._block_fn = block_fn
self._fn = fn
self._batch_size = batch_size
self._compute = compute or "tasks"
self._batch_format = batch_format
self._zero_copy_batch = zero_copy_batch
self._target_block_size = target_block_size
self._fn = fn
self._fn_args = fn_args
self._fn_kwargs = fn_kwargs
self._fn_constructor_args = fn_constructor_args
self._fn_constructor_kwargs = fn_constructor_kwargs
self._ray_remote_args = ray_remote_args or {}


def plan_map_batches_op(
op: MapBatches, input_physical_dag: PhysicalOperator
) -> PhysicalOperator:
"""Get the corresponding DAG of physical operators for MapBatches."""
class MapBatches(AbstractMap):
"""Logical operator for map_batches."""

def __init__(
self,
input_op: LogicalOperator,
block_fn: BlockTransform,
fn: BatchUDF,
batch_size: Optional[Union[int, Literal["default"]]] = "default",
compute: Optional[Union[str, ComputeStrategy]] = None,
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
zero_copy_batch: bool = False,
target_block_size: Optional[int] = None,
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(
"MapBatches",
input_op,
block_fn,
compute=compute,
target_block_size=target_block_size,
fn=fn,
fn_args=fn_args,
fn_kwargs=fn_kwargs,
fn_constructor_args=fn_constructor_args,
fn_constructor_kwargs=fn_constructor_kwargs,
ray_remote_args=ray_remote_args,
)
self._batch_size = batch_size
self._batch_format = batch_format
self._zero_copy_batch = zero_copy_batch


class MapRows(AbstractMap):
"""Logical operator for map."""

def __init__(
self,
input_op: LogicalOperator,
block_fn: BlockTransform,
fn: RowUDF,
compute: Optional[Union[str, ComputeStrategy]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(
"MapRows",
input_op,
block_fn,
compute=compute,
fn=fn,
ray_remote_args=ray_remote_args,
)


def plan_map_op(op: AbstractMap, input_physical_dag: PhysicalOperator) -> MapOperator:
"""Get the corresponding physical operators DAG for AbstractMap operators."""
compute = get_compute(op._compute)
block_fn = op._block_fn

Expand Down
Loading

0 comments on commit c51f8bf

Please sign in to comment.