Skip to content

Commit

Permalink
Add informative progress bar names to map_batches (#31526)
Browse files Browse the repository at this point in the history
Signed-off-by: pdmurray <peynmurray@gmail.com>

Signed-off-by: pdmurray <peynmurray@gmail.com>
  • Loading branch information
peytondmurray authored Jan 27, 2023
1 parent b60f887 commit 3343c76
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 65 deletions.
1 change: 1 addition & 0 deletions .bazeliskrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
USE_BAZEL_VERSION=5.x
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,5 @@ workflow_data/

# Auto-generated tag mapping
tag-mapping.json

.bazeliskrc
6 changes: 3 additions & 3 deletions python/ray/air/tests/test_dataset_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def checker(shard, results):
# applying the preprocessor on each epoch.
assert results[0] == results[1], results
stats = shard.stats()
assert "Stage 1 read->map_batches: 1/1 blocks executed " in stats, stats
assert "Stage 1 read->BatchMapper: 1/1 blocks executed " in stats, stats

def rand(x):
x["value"] = [random.random() for _ in range(len(x))]
Expand Down Expand Up @@ -284,8 +284,8 @@ def checker(shard, results):
assert results[0] != results[1], results
stats = shard.stats()
assert (
"Stage 1 read->randomize_block_order->map_batches: 1/1 blocks executed "
in stats
"Stage 1 read->randomize_block_order->"
"BatchMapper: 1/1 blocks executed " in stats
), stats

test = TestStream(
Expand Down
26 changes: 25 additions & 1 deletion python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,30 @@
logger = DatasetLogger(__name__)


def capfirst(s: str):
"""Capitalize the first letter of a string
Args:
s: String to capitalize
Returns:
Capitalized string
"""
return s[0].upper() + s[1:]


def capitalize(s: str):
"""Capitalize a string, removing '_' and keeping camelcase.
Args:
s: String to capitalize
Returns:
Capitalized string with no underscores.
"""
return "".join(capfirst(x) for x in s.split("_"))


class Stage:
"""Represents a Dataset transform stage (e.g., map or shuffle)."""

Expand Down Expand Up @@ -157,7 +181,7 @@ def get_plan_as_string(self) -> str:
# Get string representation of each stage in reverse order.
for stage in self._stages_after_snapshot[::-1]:
# Get name of each stage in camel case.
stage_name = stage.name.title().replace("_", "")
stage_name = capitalize(stage.name)
if num_stages == 0:
plan_str += f"{stage_name}\n"
else:
Expand Down
10 changes: 9 additions & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,16 @@ def process_next_batch(batch: DataBatch) -> Iterator[Block]:
if output_buffer.has_next():
yield output_buffer.next()

# breakpoint()
if hasattr(fn, "__self__") and isinstance(
fn.__self__, ray.data.preprocessor.Preprocessor
):
stage_name = fn.__self__.__class__.__name__
else:
stage_name = f'MapBatches({getattr(fn, "__name__", type(fn))})'

stage = OneToOneStage(
"map_batches",
stage_name,
transform,
compute,
ray_remote_args,
Expand Down
10 changes: 6 additions & 4 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1517,26 +1517,28 @@ def test_dataset_repr(ray_start_regular_shared):
assert repr(ds) == "Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)"
ds = ds.map_batches(lambda x: x)
assert repr(ds) == (
"MapBatches\n" "+- Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)"
"MapBatches(<lambda>)\n"
"+- Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)"
)
ds = ds.filter(lambda x: x > 0)
assert repr(ds) == (
"Filter\n"
"+- MapBatches\n"
"+- MapBatches(<lambda>)\n"
" +- Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)"
)
ds = ds.random_shuffle()
assert repr(ds) == (
"RandomShuffle\n"
"+- Filter\n"
" +- MapBatches\n"
" +- MapBatches(<lambda>)\n"
" +- Dataset(num_blocks=10, num_rows=10, schema=<class 'int'>)"
)
ds.fully_executed()
assert repr(ds) == "Dataset(num_blocks=10, num_rows=9, schema=<class 'int'>)"
ds = ds.map_batches(lambda x: x)
assert repr(ds) == (
"MapBatches\n" "+- Dataset(num_blocks=10, num_rows=9, schema=<class 'int'>)"
"MapBatches(<lambda>)\n"
"+- Dataset(num_blocks=10, num_rows=9, schema=<class 'int'>)"
)
ds1, ds2 = ds.split(2)
assert (
Expand Down
64 changes: 36 additions & 28 deletions python/ray/data/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def expect_stages(pipe, num_stages_expected, stage_names):
), pipe._optimized_stages


def dummy_map(x):
"""Dummy function used in calls to map_batches in these tests."""
return x


def test_memory_sanity(shutdown_only):
info = ray.init(num_cpus=1, object_store_memory=500e6)
ds = ray.data.range(10)
Expand Down Expand Up @@ -312,23 +317,23 @@ def test_optimize_reorder(ray_start_regular_shared):
context.optimize_fuse_read_stages = True
context.optimize_reorder_stages = True

ds = ray.data.range(10).randomize_block_order().map_batches(lambda x: x)
ds = ray.data.range(10).randomize_block_order().map_batches(dummy_map)
expect_stages(
ds,
2,
["read->map_batches", "randomize_block_order"],
["read->MapBatches(dummy_map)", "randomize_block_order"],
)

ds2 = (
ray.data.range(10)
.randomize_block_order()
.repartition(10)
.map_batches(lambda x: x)
.map_batches(dummy_map)
)
expect_stages(
ds2,
3,
["read->randomize_block_order", "repartition", "map_batches"],
["read->randomize_block_order", "repartition", "MapBatches(dummy_map)"],
)


Expand All @@ -338,19 +343,19 @@ def test_window_randomize_fusion(ray_start_regular_shared):
context.optimize_fuse_read_stages = True
context.optimize_reorder_stages = True

pipe = ray.data.range(100).randomize_block_order().window().map_batches(lambda x: x)
pipe = ray.data.range(100).randomize_block_order().window().map_batches(dummy_map)
pipe.take()
stats = pipe.stats()
assert "read->randomize_block_order->map_batches" in stats, stats
assert "read->randomize_block_order->MapBatches(dummy_map)" in stats, stats


def test_optimize_fuse(ray_start_regular_shared):
context = DatasetContext.get_current()

def build_pipe():
pipe = ray.data.range(3).window(blocks_per_window=1).repeat(2)
pipe = pipe.map_batches(lambda x: x)
pipe = pipe.map_batches(lambda x: x)
pipe = pipe.map_batches(dummy_map)
pipe = pipe.map_batches(dummy_map)
pipe = pipe.random_shuffle_each_window()
results = [sorted(p.take()) for p in pipe.iter_epochs()]
assert results == [[0, 1, 2], [0, 1, 2]], results
Expand All @@ -362,7 +367,10 @@ def build_pipe():
expect_stages(
build_pipe(),
1,
["read->map_batches->map_batches->random_shuffle_map", "random_shuffle_reduce"],
[
"read->MapBatches(dummy_map)->MapBatches(dummy_map)->random_shuffle_map",
"random_shuffle_reduce",
],
)

context.optimize_fuse_stages = True
Expand All @@ -373,7 +381,7 @@ def build_pipe():
1,
[
"read",
"map_batches->map_batches->random_shuffle_map",
"MapBatches(dummy_map)->MapBatches(dummy_map)->random_shuffle_map",
"random_shuffle_reduce",
],
)
Expand All @@ -386,7 +394,7 @@ def build_pipe():
2,
[
"read",
"map_batches->map_batches",
"MapBatches(dummy_map)->MapBatches(dummy_map)",
"random_shuffle_map",
"random_shuffle_reduce",
],
Expand All @@ -400,8 +408,8 @@ def build_pipe():
3,
[
"read",
"map_batches",
"map_batches",
"MapBatches(dummy_map)",
"MapBatches(dummy_map)",
"random_shuffle_map",
"random_shuffle_reduce",
],
Expand All @@ -428,29 +436,29 @@ def test_optimize_equivalent_remote_args(ray_start_regular_shared):
for kwb in equivalent_kwargs:
print("CHECKING", kwa, kwb)
pipe = ray.data.range(3).repeat(2)
pipe = pipe.map_batches(lambda x: x, compute="tasks", **kwa)
pipe = pipe.map_batches(lambda x: x, compute="tasks", **kwb)
pipe = pipe.map_batches(dummy_map, compute="tasks", **kwa)
pipe = pipe.map_batches(dummy_map, compute="tasks", **kwb)
pipe.take()
expect_stages(
pipe,
1,
[
"read->map_batches->map_batches",
"read->MapBatches(dummy_map)->MapBatches(dummy_map)",
],
)

for kwa in equivalent_kwargs:
for kwb in equivalent_kwargs:
print("CHECKING", kwa, kwb)
pipe = ray.data.range(3).repeat(2)
pipe = pipe.map_batches(lambda x: x, compute="tasks", **kwa)
pipe = pipe.map_batches(dummy_map, compute="tasks", **kwa)
pipe = pipe.random_shuffle_each_window(**kwb)
pipe.take()
expect_stages(
pipe,
1,
[
"read->map_batches->random_shuffle_map",
"read->MapBatches(dummy_map)->random_shuffle_map",
"random_shuffle_reduce",
],
)
Expand All @@ -464,32 +472,32 @@ def test_optimize_incompatible_stages(ray_start_regular_shared):

pipe = ray.data.range(3).repeat(2)
# Should get fused as long as their resource types are compatible.
pipe = pipe.map_batches(lambda x: x, compute="actors")
pipe = pipe.map_batches(dummy_map, compute="actors")
# Cannot fuse actors->tasks.
pipe = pipe.map_batches(lambda x: x, compute="tasks")
pipe = pipe.map_batches(dummy_map, compute="tasks")
pipe = pipe.random_shuffle_each_window()
pipe.take()
expect_stages(
pipe,
2,
[
"read->map_batches",
"map_batches->random_shuffle_map",
"read->MapBatches(dummy_map)",
"MapBatches(dummy_map)->random_shuffle_map",
"random_shuffle_reduce",
],
)

pipe = ray.data.range(3).repeat(2)
pipe = pipe.map_batches(lambda x: x, compute="tasks")
pipe = pipe.map_batches(lambda x: x, num_cpus=0.75)
pipe = pipe.map_batches(dummy_map, compute="tasks")
pipe = pipe.map_batches(dummy_map, num_cpus=0.75)
pipe = pipe.random_shuffle_each_window()
pipe.take()
expect_stages(
pipe,
3,
[
"read->map_batches",
"map_batches",
"read->MapBatches(dummy_map)",
"MapBatches(dummy_map)",
"random_shuffle_map",
"random_shuffle_reduce",
],
Expand Down Expand Up @@ -556,7 +564,7 @@ def __call__(self, x):
pipe,
1,
[
"read->map_batches->map_batches",
"read->MapBatches(CallableFn)->MapBatches(CallableFn)",
],
)

Expand Down Expand Up @@ -592,7 +600,7 @@ def __call__(self, x):
pipe,
1,
[
"read->map_batches->map_batches",
"read->MapBatches(<lambda>)->MapBatches(CallableFn)",
],
)

Expand Down
Loading

0 comments on commit 3343c76

Please sign in to comment.