Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] New executor [10/n]--- Plumbing for locality_with_output and setting execution options #31908

Merged
merged 95 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
761a53a
streaming stuff only
ericl Jan 10, 2023
8404b37
add basic streaming
ericl Jan 11, 2023
124b9b5
integrate with feature flag
ericl Jan 11, 2023
78a89c3
fix stats
ericl Jan 11, 2023
34990e8
stats todo
ericl Jan 11, 2023
e0c3e04
wip refactor
ericl Jan 11, 2023
9117a85
fix it
ericl Jan 12, 2023
3cf4a6e
fix finalization
ericl Jan 12, 2023
b4147b5
add sanity test
ericl Jan 12, 2023
6c5f732
test completed flag
ericl Jan 12, 2023
11d0a7d
remove demo
ericl Jan 12, 2023
008eb13
disable by default
ericl Jan 12, 2023
7c4117b
fix legacy call
ericl Jan 12, 2023
75ed42d
off
ericl Jan 12, 2023
8901571
comments 1
ericl Jan 12, 2023
02a6043
Merge remote-tracking branch 'upstream/master' into streaming-executor
ericl Jan 12, 2023
528f87d
remove index from inputs done
ericl Jan 12, 2023
4a36f66
fix tests
ericl Jan 12, 2023
fe86e26
limit 1
ericl Jan 13, 2023
15e382f
wip
ericl Jan 13, 2023
e34273d
add lim
ericl Jan 13, 2023
ee2d105
validation code
ericl Jan 13, 2023
97ed2a0
Merge remote-tracking branch 'upstream/master' into streaming-executor
ericl Jan 13, 2023
4d56c68
Merge branch 'streaming-executor' into resource-limits
ericl Jan 13, 2023
9e36935
add resources
ericl Jan 13, 2023
ea64f22
canonicalize remote args
ericl Jan 13, 2023
d81c7c0
valid code
ericl Jan 13, 2023
481ce6a
todos
ericl Jan 13, 2023
67ae881
fix regression
ericl Jan 13, 2023
c16d265
Merge remote-tracking branch 'upstream/master' into streaming-executor
ericl Jan 13, 2023
159c2fa
Merge branch 'streaming-executor' into resource-limits
ericl Jan 13, 2023
3e954e9
remove stale docs
ericl Jan 13, 2023
ba8e530
comments 2
ericl Jan 15, 2023
31cfd08
remove unnecessary reverse
ericl Jan 15, 2023
7858cc2
Merge remote-tracking branch 'upstream/master' into streaming-executor
ericl Jan 17, 2023
1a66c03
Merge branch 'streaming-executor' into resource-limits
ericl Jan 17, 2023
3facdb3
add int math
ericl Jan 17, 2023
057db68
fix resource limits
ericl Jan 17, 2023
9d2e46a
add limits reporting
ericl Jan 18, 2023
0dd2459
add todo
ericl Jan 18, 2023
96b7202
update
ericl Jan 18, 2023
647ae29
test actor pool
ericl Jan 18, 2023
64909b5
add some todos
ericl Jan 18, 2023
8123f90
fix error message
ericl Jan 18, 2023
9fbd192
update limits
ericl Jan 18, 2023
6081848
1/4
ericl Jan 18, 2023
3c042b9
add tracing
ericl Jan 18, 2023
06e6f8d
trace step
ericl Jan 18, 2023
ed821d3
test unordered map
ericl Jan 18, 2023
75c688f
improve progress bar
ericl Jan 19, 2023
a9e4ed4
Merge remote-tracking branch 'upstream/master' into resource-limits
ericl Jan 19, 2023
3863a9c
refresh less
ericl Jan 19, 2023
5f7bb0d
todo
ericl Jan 19, 2023
9b8c453
restore limits
ericl Jan 19, 2023
257d6c5
refactor
ericl Jan 19, 2023
b9cec56
wip
ericl Jan 19, 2023
e0b550b
wip
ericl Jan 19, 2023
00c1cea
shutdown
ericl Jan 19, 2023
d37a0dc
wip
ericl Jan 19, 2023
bd5d2dc
wip
ericl Jan 19, 2023
29970f7
fix test operator
ericl Jan 19, 2023
ea18db1
fix exec test
ericl Jan 19, 2023
464bad3
test order preservation
ericl Jan 19, 2023
273ddb8
Merge branch 'streaming-interfaces' into resource-limits
ericl Jan 19, 2023
cdc7f5b
fix
ericl Jan 19, 2023
32377ac
wip
ericl Jan 19, 2023
5802753
fix test
ericl Jan 19, 2023
75a46e9
update test
ericl Jan 19, 2023
61ba793
Merge remote-tracking branch 'upstream/master' into streaming-interfaces
ericl Jan 20, 2023
dd2c073
deflake
ericl Jan 20, 2023
9c6cf2c
add progress str
ericl Jan 20, 2023
ed80f9a
remove s
ericl Jan 20, 2023
3871a0a
comments 3
ericl Jan 20, 2023
2416de0
Merge remote-tracking branch 'upstream/master' into streaming-interfaces
ericl Jan 21, 2023
4206fab
Merge branch 'streaming-interfaces' into resource-limits
ericl Jan 21, 2023
21311ce
cleanup
ericl Jan 23, 2023
2f2d4e3
Merge remote-tracking branch 'upstream/master' into resource-limits
ericl Jan 23, 2023
1434a19
update 2
ericl Jan 23, 2023
03ced27
fix tests
ericl Jan 23, 2023
82c740c
add basic tests
ericl Jan 23, 2023
9194c8f
wip
ericl Jan 23, 2023
5a92b13
update
ericl Jan 23, 2023
59f6695
Merge remote-tracking branch 'upstream/master' into resource-limits
ericl Jan 24, 2023
96e0c20
plumbing
ericl Jan 24, 2023
d2ccc49
check scheduling strat
ericl Jan 24, 2023
5f7ecbf
add spread strategy test
ericl Jan 24, 2023
9aed814
update
ericl Jan 24, 2023
c257365
Merge remote-tracking branch 'upstream/master' into resource-limits
ericl Jan 24, 2023
34911bd
remove broken test
ericl Jan 25, 2023
295af5d
lint
ericl Jan 25, 2023
ca933bc
fix
ericl Jan 25, 2023
5f6ed2c
Merge remote-tracking branch 'upstream/master' into resource-limits
ericl Jan 25, 2023
b77d577
Merge branch 'resource-limits' into plumbing2
ericl Jan 25, 2023
aa0b4bd
fix test
ericl Jan 25, 2023
67ff6c9
fix lint
ericl Jan 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from ray.data.block import Block, BlockMetadata
from ray.data.context import DatasetContext
from ray.data.context import DEFAULT_SCHEDULING_STRATEGY
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
)
from ray.data._internal.execution.operators.map_task_submitter import (
MapTaskSubmitter,
_map_task,
Expand All @@ -28,8 +31,7 @@ def __init__(
ray_remote_args: Remote arguments for the Ray actors to be created.
pool_size: The size of the actor pool.
"""
self._transform_fn_ref = transform_fn_ref
self._ray_remote_args = ray_remote_args
super().__init__(transform_fn_ref, ray_remote_args)
self._pool_size = pool_size
# A map from task output futures to the actors on which they are running.
self._active_actors: Dict[ObjectRef[Block], ray.actor.ActorHandle] = {}
Expand All @@ -39,7 +41,8 @@ def __init__(
def progress_str(self) -> str:
return f"{self._actor_pool.size()} actors"

def start(self):
def start(self, options: ExecutionOptions):
super().start(options)
# Create the actor workers and add them to the pool.
ray_remote_args = self._apply_default_remote_args(self._ray_remote_args)
cls_ = ray.remote(**ray_remote_args)(MapWorker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(
self._output_queue: Optional[_OutputQueue] = None

def start(self, options: ExecutionOptions) -> None:
self._task_submitter.start()
self._task_submitter.start(options)
if options.preserve_order:
self._output_queue = _OrderedOutputQueue()
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from abc import ABC, abstractmethod
from typing import List, Union, Tuple, Callable, Iterator
from typing import Dict, Any, List, Union, Tuple, Callable, Iterator

import ray
from ray.data.block import Block, BlockAccessor, BlockMetadata, BlockExecStats
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
)
from ray.types import ObjectRef
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from ray._raylet import ObjectRefGenerator


Expand All @@ -13,14 +19,35 @@ class MapTaskSubmitter(ABC):
submission is done.
"""

def start(self):
def __init__(
self,
transform_fn_ref: ObjectRef[Callable[[Iterator[Block]], Iterator[Block]]],
ray_remote_args: Dict[str, Any],
):
"""Create a TaskPoolSubmitter instance.

Args:
transform_fn_ref: The function to apply to a block bundle in the submitted
map task.
ray_remote_args: Remote arguments for the Ray tasks to be launched.
"""
self._transform_fn_ref = transform_fn_ref
self._ray_remote_args = ray_remote_args

def start(self, options: ExecutionOptions):
"""Start the task submitter so it's ready to submit tasks.

This is called when execution of the map operator actually starts, and is where
the submitter can initialize expensive state, reserve resources, start workers,
etc.
"""
pass
if options.locality_with_output:
self._ray_remote_args[
"scheduling_strategy"
] = NodeAffinitySchedulingStrategy(
ray.get_runtime_context().get_node_id(),
soft=True,
)

@abstractmethod
def submit(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Any, Iterator, Callable, Union, List
from typing import Union, List

import ray
from ray.data.block import Block
Expand All @@ -14,21 +14,6 @@
class TaskPoolSubmitter(MapTaskSubmitter):
"""A task submitter for MapOperator that uses normal Ray tasks."""

def __init__(
self,
transform_fn_ref: ObjectRef[Callable[[Iterator[Block]], Iterator[Block]]],
ray_remote_args: Dict[str, Any],
):
"""Create a TaskPoolSubmitter instance.

Args:
transform_fn_ref: The function to apply to a block bundle in the submitted
map task.
ray_remote_args: Remote arguments for the Ray tasks to be launched.
"""
self._transform_fn_ref = transform_fn_ref
self._ray_remote_args = ray_remote_args

def submit(
self, input_blocks: List[ObjectRef[Block]]
) -> ObjectRef[ObjectRefGenerator]:
Expand Down
6 changes: 2 additions & 4 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,11 @@ def execute_to_iterator(
)

from ray.data._internal.execution.streaming_executor import StreamingExecutor
from ray.data._internal.execution.interfaces import ExecutionOptions
from ray.data._internal.execution.legacy_compat import (
execute_to_legacy_block_iterator,
)

executor = StreamingExecutor(ExecutionOptions(preserve_order=False))
executor = StreamingExecutor(copy.deepcopy(ctx.execution_options))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given execution_options could be muted in StreamingExecutor/BulkExecutor, shall we just provide a method in DatasetContext.get_execution_options() to just return a deep copy of ctx.execution_options?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this comment--- I'll file a followup.

block_iter = execute_to_legacy_block_iterator(
executor,
self,
Expand Down Expand Up @@ -500,12 +499,11 @@ def execute(
if not self.has_computed_output():
if self._run_with_new_execution_backend():
from ray.data._internal.execution.bulk_executor import BulkExecutor
from ray.data._internal.execution.interfaces import ExecutionOptions
from ray.data._internal.execution.legacy_compat import (
execute_to_legacy_block_list,
)

executor = BulkExecutor(ExecutionOptions())
executor = BulkExecutor(copy.deepcopy(context.execution_options))
blocks = execute_to_legacy_block_list(
executor,
self,
Expand Down
11 changes: 10 additions & 1 deletion python/ray/data/context.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os
import threading
from typing import Optional
from typing import Optional, TYPE_CHECKING

from ray.util.annotations import DeveloperAPI
from ray.util.scheduling_strategies import SchedulingStrategyT

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces import ExecutionOptions

# The context singleton on this process.
_default_context: "Optional[DatasetContext]" = None
_context_lock = threading.Lock()
Expand Down Expand Up @@ -144,6 +147,7 @@ def __init__(
enable_auto_log_stats: bool,
trace_allocations: bool,
optimizer_enabled: bool,
execution_options: "ExecutionOptions",
):
"""Private constructor (use get_current() instead)."""
self.block_splitting_enabled = block_splitting_enabled
Expand Down Expand Up @@ -171,6 +175,8 @@ def __init__(
self.enable_auto_log_stats = enable_auto_log_stats
self.trace_allocations = trace_allocations
self.optimizer_enabled = optimizer_enabled
# TODO: expose execution options in Dataset public APIs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be better to create a tracking issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to #31797

self.execution_options = execution_options

@staticmethod
def get_current() -> "DatasetContext":
Expand All @@ -179,6 +185,8 @@ def get_current() -> "DatasetContext":
If the context has not yet been created in this process, it will be
initialized with default settings.
"""
from ray.data._internal.execution.interfaces import ExecutionOptions

global _default_context

with _context_lock:
Expand Down Expand Up @@ -213,6 +221,7 @@ def get_current() -> "DatasetContext":
enable_auto_log_stats=DEFAULT_AUTO_LOG_STATS,
trace_allocations=DEFAULT_TRACE_ALLOCATIONS,
optimizer_enabled=DEFAULT_OPTIMIZER_ENABLED,
execution_options=ExecutionOptions(),
)

return _default_context
Expand Down
71 changes: 71 additions & 0 deletions python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import List, Any

import ray
from ray.data.context import DatasetContext
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
Expand All @@ -27,6 +28,7 @@
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.execution.util import make_ref_bundles
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy


@ray.remote
Expand Down Expand Up @@ -291,6 +293,75 @@ def reverse_sort(inputs: List[RefBundle]):
assert output == expected, (output, expected)


def test_e2e_option_propagation():
DatasetContext.get_current().new_execution_backend = True
DatasetContext.get_current().use_streaming_executor = True

def run():
ray.data.range(5, parallelism=5).map(
lambda x: x, compute=ray.data.ActorPoolStrategy(2, 2)
).take_all()

DatasetContext.get_current().execution_options.resource_limits = (
ExecutionResources()
)
run()

DatasetContext.get_current().execution_options.resource_limits.cpu = 1
with pytest.raises(ValueError):
run()


def test_configure_spread_e2e():
from ray import remote_function

tasks = []

def _test_hook(fn, args, strategy):
if "map_task" in str(fn):
tasks.append(strategy)

remote_function._task_launch_hook = _test_hook
DatasetContext.get_current().use_streaming_executor = True
DatasetContext.get_current().execution_options.preserve_order = True

# Simple 2-stage pipeline.
ray.data.range(2, parallelism=2).map(lambda x: x, num_cpus=2).take_all()

# Read tasks get SPREAD by default, subsequent ones use default policy.
tasks = sorted(tasks)
assert tasks == ["DEFAULT", "DEFAULT", "SPREAD", "SPREAD"]


def test_configure_output_locality():
inputs = make_ref_bundles([[x] for x in range(20)])
o1 = InputDataBuffer(inputs)
o2 = MapOperator(make_transform(lambda block: [b * -1 for b in block]), o1)
o3 = MapOperator(
make_transform(lambda block: [b * 2 for b in block]),
o2,
compute_strategy=ray.data.ActorPoolStrategy(1, 1),
)
topo, _ = build_streaming_topology(o3, ExecutionOptions(locality_with_output=False))
assert (
o2._execution_state._task_submitter._ray_remote_args.get("scheduling_strategy")
is None
)
assert (
o3._execution_state._task_submitter._ray_remote_args.get("scheduling_strategy")
is None
)
topo, _ = build_streaming_topology(o3, ExecutionOptions(locality_with_output=True))
assert isinstance(
o2._execution_state._task_submitter._ray_remote_args["scheduling_strategy"],
NodeAffinitySchedulingStrategy,
)
assert isinstance(
o3._execution_state._task_submitter._ray_remote_args["scheduling_strategy"],
NodeAffinitySchedulingStrategy,
)


if __name__ == "__main__":
import sys

Expand Down