Skip to content

Commit

Permalink
[Data] Fix bug with inserting custom optimization rule at index 0 (#4…
Browse files Browse the repository at this point in the history
…8051)

## Why are these changes needed?
Fix the bug reported
#48039 (comment)

## Related issue number

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Scott Lee <sjl@anyscale.com>
  • Loading branch information
scottjlee authored Oct 16, 2024
1 parent 0056097 commit dc73531
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 2 deletions.
4 changes: 2 additions & 2 deletions python/ray/data/_internal/logical/optimizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@

@DeveloperAPI
def register_logical_rule(cls: Type[Rule], insert_index: Optional[int] = None):
if not insert_index:
if insert_index is None:
_LOGICAL_RULES.append(cls)
else:
_LOGICAL_RULES.insert(insert_index, cls)


@DeveloperAPI
def register_physical_rule(cls: Type[Rule], insert_index: Optional[int] = None):
if not insert_index:
if insert_index is None:
_PHYSICAL_RULES.append(cls)
else:
_PHYSICAL_RULES.insert(insert_index, cls)
Expand Down
83 changes: 83 additions & 0 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1649,5 +1649,88 @@ def test_zero_copy_fusion_eliminate_build_output_blocks(ray_start_regular_shared
)


def test_insert_logical_optimization_rules():
class FakeRule1:
pass

class FakeRule2:
pass

from ray.data._internal.logical.optimizers import (
_LOGICAL_RULES,
register_logical_rule,
)
from ray.data._internal.logical.rules.randomize_blocks import (
ReorderRandomizeBlocksRule,
)

register_logical_rule(FakeRule1)
assert _LOGICAL_RULES == [ReorderRandomizeBlocksRule, FakeRule1]

register_logical_rule(FakeRule2, 1)
assert _LOGICAL_RULES == [ReorderRandomizeBlocksRule, FakeRule2, FakeRule1]

register_logical_rule(FakeRule1, 0)
assert _LOGICAL_RULES == [
FakeRule1,
ReorderRandomizeBlocksRule,
FakeRule2,
FakeRule1,
]


def test_insert_physical_optimization_rules():
class FakeRule1:
pass

class FakeRule2:
pass

from ray.data._internal.logical.optimizers import (
_PHYSICAL_RULES,
register_physical_rule,
)
from ray.data._internal.logical.rules.inherit_target_max_block_size import (
InheritTargetMaxBlockSizeRule,
)
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule
from ray.data._internal.logical.rules.set_read_parallelism import (
SetReadParallelismRule,
)
from ray.data._internal.logical.rules.zero_copy_map_fusion import (
EliminateBuildOutputBlocks,
)

register_physical_rule(FakeRule1)
assert _PHYSICAL_RULES == [
InheritTargetMaxBlockSizeRule,
SetReadParallelismRule,
OperatorFusionRule,
EliminateBuildOutputBlocks,
FakeRule1,
]

register_physical_rule(FakeRule2, 2)
assert _PHYSICAL_RULES == [
InheritTargetMaxBlockSizeRule,
SetReadParallelismRule,
FakeRule2,
OperatorFusionRule,
EliminateBuildOutputBlocks,
FakeRule1,
]

register_physical_rule(FakeRule1, 0)
assert _PHYSICAL_RULES == [
FakeRule1,
InheritTargetMaxBlockSizeRule,
SetReadParallelismRule,
FakeRule2,
OperatorFusionRule,
EliminateBuildOutputBlocks,
FakeRule1,
]


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

0 comments on commit dc73531

Please sign in to comment.