Skip to content

Commit

Permalink
[Data] Improve warning message when read task is large (#46942)
Browse files Browse the repository at this point in the history
When a read task is large, Ray Data prints a message like this for each
read task:

> WARNING: the read task size (17157648427 bytes) is larger than the
reported output size of the task (None bytes). This may be a size
reporting bug in the datasource being read from.

The message isn't actionable, hard to understand, and spammy (it can be
printed thousands of times). To address these issues, this PR makes Ray
Data emit a warning like this once:

> UserWarning: The serialized size of your read function named 'read_fn'
is 128.0MB. This size relatively large. As a result, Ray might
excessively spill objects during execution. To fix this issue, avoid
accessing `self` or other large objects in 'read_fn'.

---------

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
  • Loading branch information
bveeramani authored Aug 3, 2024
1 parent 1621b4c commit b29bb0a
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def aggregate_output_metadata(self) -> BlockMetadata:
return BlockMetadata(None, None, None, None, None)

# `get_read_tasks` isn't guaranteed to return exactly one read task.
metadata = [read_task.get_metadata() for read_task in read_tasks]
metadata = [read_task.metadata for read_task in read_tasks]

if all(meta.num_rows is not None for meta in metadata):
num_rows = sum(meta.num_rows for meta in metadata)
Expand Down
28 changes: 15 additions & 13 deletions python/ray/data/_internal/planner/plan_read_op.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import warnings
from typing import Iterable, List

import ray
Expand All @@ -14,12 +15,14 @@
MapTransformer,
MapTransformFn,
)
from ray.data._internal.execution.util import memory_string
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.util import _warn_on_high_parallelism, call_with_retry
from ray.data.block import Block
from ray.data.block import Block, BlockMetadata
from ray.data.datasource.datasource import ReadTask
from ray.util.debug import log_once

TASK_SIZE_WARN_THRESHOLD_BYTES = 100000
TASK_SIZE_WARN_THRESHOLD_BYTES = 1024 * 1024 # 1 MiB

# Transient errors that can occur during longer reads. Trigger retry when these occur.
READ_FILE_RETRY_ON_ERRORS = ["AWS Error NETWORK_CONNECTION", "AWS Error ACCESS_DENIED"]
Expand All @@ -29,24 +32,23 @@
logger = logging.getLogger(__name__)


def cleaned_metadata(read_task: ReadTask):
block_meta = read_task.get_metadata()
def cleaned_metadata(read_task: ReadTask) -> BlockMetadata:
task_size = len(cloudpickle.dumps(read_task))
if (
block_meta.size_bytes is not None
and task_size > block_meta.size_bytes
and task_size > TASK_SIZE_WARN_THRESHOLD_BYTES
if task_size > TASK_SIZE_WARN_THRESHOLD_BYTES and log_once(
f"large_read_task_{read_task.read_fn.__name__}"
):
logger.warning(
f"The read task size ({task_size} bytes) is larger "
"than the reported output size of the task "
f"({block_meta.size_bytes} bytes). This may be a size "
"reporting bug in the datasource being read from."
warnings.warn(
"The serialized size of your read function named "
f"'{read_task.read_fn.__name__}' is {memory_string(task_size)}. This size "
"relatively large. As a result, Ray might excessively "
"spill objects during execution. To fix this issue, avoid accessing "
f"`self` or other large objects in '{read_task.read_fn.__name__}'."
)

# Defensively compute the size of the block as the max size reported by the
# datasource and the actual read task size. This is to guard against issues
# with bad metadata reporting.
block_meta = read_task.metadata
if block_meta.size_bytes is None or task_size > block_meta.size_bytes:
block_meta.size_bytes = task_size

Expand Down
9 changes: 7 additions & 2 deletions python/ray/data/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ReadTask(Callable[[], Iterable[Block]]):
Read tasks are generated by :meth:`~ray.data.Datasource.get_read_tasks`,
and return a list of ``ray.data.Block`` when called. Initial metadata about the read
operation can be retrieved via ``get_metadata()`` prior to executing the
operation can be retrieved via the ``metadata`` attribute prior to executing the
read. Final metadata is returned after the read along with the blocks.
Ray will execute read tasks in remote functions to parallelize execution.
Expand All @@ -149,9 +149,14 @@ def __init__(self, read_fn: Callable[[], Iterable[Block]], metadata: BlockMetada
self._metadata = metadata
self._read_fn = read_fn

def get_metadata(self) -> BlockMetadata:
@property
def metadata(self) -> BlockMetadata:
return self._metadata

@property
def read_fn(self) -> Callable[[], Iterable[Block]]:
return self._read_fn

def __call__(self) -> Iterable[Block]:
result = self._read_fn()
if not hasattr(result, "__iter__"):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def read_datasource(
import uuid

stats = DatasetStats(
metadata={"Read": [read_task.get_metadata() for read_task in read_tasks]},
metadata={"Read": [read_task.metadata for read_task in read_tasks]},
parent=None,
needs_stats_actor=True,
stats_uuid=uuid.uuid4(),
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_get_read_tasks(self, mock_setup_delta_sharing_connections):
self.assertTrue(all(isinstance(task, ReadTask) for task in read_tasks))

for task in read_tasks:
metadata = task.get_metadata()
metadata = task.metadata
self.assertIsInstance(metadata, BlockMetadata)
self.assertEqual(len(metadata.input_files), 1)
self.assertTrue(metadata.input_files[0]["url"] in ["file1", "file2"])
Expand Down
21 changes: 21 additions & 0 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@
from ray.data._internal.planner.exchange.sort_task_spec import SortKey
from ray.data._internal.planner.planner import Planner
from ray.data._internal.stats import DatasetStats
from ray.data.block import BlockMetadata
from ray.data.context import DataContext
from ray.data.datasource import Datasource
from ray.data.datasource.datasource import ReadTask
from ray.data.tests.conftest import * # noqa
from ray.data.tests.test_util import get_parquet_read_logical_op
from ray.data.tests.util import column_udf, extract_values, named_values
Expand Down Expand Up @@ -110,6 +113,24 @@ def test_read_operator(ray_start_regular_shared):
)


def test_read_operator_emits_warning_for_large_read_tasks():
class StubDatasource(Datasource):
def estimate_inmemory_data_size(self) -> Optional[int]:
return None

def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
large_object = np.zeros((128, 1024, 1024), dtype=np.uint8) # 128 MiB

def read_fn():
large_object
yield pd.DataFrame({"column": [0]})

return [ReadTask(read_fn, BlockMetadata(1, None, None, None, None))]

with pytest.warns(UserWarning):
ray.data.read_datasource(StubDatasource()).materialize()


def test_split_blocks_operator(ray_start_regular_shared):
planner = Planner()
op = get_parquet_read_logical_op(parallelism=10)
Expand Down

0 comments on commit b29bb0a

Please sign in to comment.