-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] Add logical operator for randomize_block_order() #31977
Conversation
@@ -62,6 +45,24 @@ def __init__( | |||
fn_constructor_kwargs: Optional[Dict[str, Any]] = None, | |||
ray_remote_args: Optional[Dict[str, Any]] = None, | |||
): | |||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is just moving around, no actual change for words.
op: AbstractAllToAll, | ||
input_physical_dag: PhysicalOperator | ||
) -> AllToAllOperator: | ||
"""Get the corresponding physical operators DAG for AbstractAllToAll operators.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is mostly copied from legacy_compat.py:_stage_to_operator()
fn: Callable[[BlockList, bool, Callable, dict], Tuple[BlockList, dict]], | ||
num_blocks: Optional[int] = None, | ||
supports_block_udf: bool = False, | ||
block_udf: Optional[BlockTransform] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove supports_block_udf / block_udf from the logical plan (afaik it's only used to represent fused stages, and we can instead represent it as Map->AllToAll)?
I think the planner can generate the code implementing the block udf fusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericl - yeah removed.
self, | ||
name: str, | ||
input_op: LogicalOperator, | ||
fn: Callable[[BlockList, bool, Callable, dict], Tuple[BlockList, dict]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't reference the legacy BlockList abstractions in the new packages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericl - removed.
from ray.data._internal.stats import StatsDict | ||
|
||
|
||
class AbstractAllToAll(LogicalOperator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should logical operators not specify a fn
at all? That should be generated during planning time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericl - yeah agreed. Created a separate planner
directory to put the generated functions for each operator.
|
||
|
||
def plan_all_to_all_op( | ||
op: AbstractAllToAll, input_physical_dag: PhysicalOperator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make it a bit more clear (by naming and/or comments) that this is translating a single operator node (instead of entire dag)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jianoaix - sure, updated.
inspecting the logical plan of a Dataset. | ||
input_op: The operator preceding this operator in the plan DAG. The outputs | ||
of `input_op` will be the inputs to this operator. | ||
block_fn: The transform function to apply to each input block to produce |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we name this as transform_fn
? It's consistent (and IMO more clear) with the naming in new backend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, actually after second thought, this argument should not be part of logical operator. Logical operator should only care about user-defined function. This is internal function we generate for execution. Plan to remove it and put into planner
directory in a separate PR.
c669ff6
to
4ca2252
Compare
from ray.data.block import Block, CallableClass | ||
|
||
|
||
def _plan_map_op(op: AbstractMap, input_physical_dag: PhysicalOperator) -> MapOperator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from map_operator.py
.
from ray.data.datasource.datasource import ReadTask | ||
|
||
|
||
def _plan_read_op(op: Read) -> PhysicalOperator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from read_operator.py
) -> Callable[[List[RefBundle]], Tuple[List[RefBundle], StatsDict]]: | ||
"""Generate function to randomize order of blocks.""" | ||
|
||
def fn(refs: List[RefBundle]) -> Tuple[List[RefBundle], StatsDict]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See Planner.plan() for more details. | ||
""" | ||
compute = get_compute(op._compute) | ||
block_fn = op._block_fn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also remove block_fn from the op?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericl - yes, plan to do in a separate PR - #31977 (comment) .
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
…ect#31977) This PR adds logical operator for randomize_block_order(). The change includes: Introduce AbstractAllToAll for all logical operators converted to AllToAllOperator RandomizeBlocks logical operator for randomize_block_order(). _internal/planner to move logic for Planner here and have generated function for randomize_blocks. This can be used later to create MapOperator/AllToAllOperator. Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Signed-off-by: Cheng Su scnju13@gmail.com
Why are these changes needed?
This PR adds logical operator for
randomize_block_order()
. The change includes:AbstractAllToAll
for all logical operators converted toAllToAllOperator
RandomizeBlocks
logical operator forrandomize_block_order()
._internal/planner
to move logic forPlanner
here and have generated function forrandomize_blocks
. This can be used later to createMapOperator
/AllToAllOperator
.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.