Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions python/ray/data/_internal/planner/plan_download_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,14 @@ def download_bytes_threaded(
def load_uri_bytes(uri_path_iterator):
"""Function that takes an iterator of URI paths and yields downloaded bytes for each."""
for uri_path in uri_path_iterator:
with fs.open_input_file(uri_path) as f:
yield f.read()
try:
with fs.open_input_file(uri_path) as f:
yield f.read()
except OSError as e:
logger.debug(
f"Failed to download URI '{uri_path}' from column '{uri_column_name}' with error: {e}"
)
yield None

# Use make_async_gen to download URI bytes concurrently
# This preserves the order of results to match the input URIs
Expand Down Expand Up @@ -322,9 +328,9 @@ def get_file_size(uri_path, fs):
for future in as_completed(futures):
try:
size = future.result()
if size is not None:
file_sizes.append(size)
file_sizes.append(size if size is not None else 0)
except Exception as e:
logger.warning(f"Error fetching file size for download: {e}")
file_sizes.append(0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Scrambled Data Causes Wrong Partition Estimates

The _sample_sizes method uses as_completed(futures) which returns results in completion order rather than submission order. This scrambles the file sizes list so it no longer corresponds to the input URI order. When multiple URI columns are used, _estimate_nrows_per_partition calls zip(*sampled_file_sizes_by_column.values()) on line 280, which assumes file sizes from different columns align by row index. The scrambled ordering causes file sizes from different rows to be incorrectly combined, producing wrong partition size estimates.

Fix in Cursor Fix in Web

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really interesting catch. However, I think we should solve this separately by removing support for multiple columns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by #58517


return file_sizes
111 changes: 111 additions & 0 deletions python/ray/data/tests/test_download_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,117 @@ def test_download_expression_with_null_uris(self):
# If it fails, should be a reasonable error (not a crash)
assert isinstance(e, (ValueError, KeyError, RuntimeError))

def test_download_expression_with_invalid_uris(self, tmp_path):
"""Test download expression with URIs that fail to download.

This tests the exception handling in load_uri_bytes
where OSError is caught and None is returned for failed downloads.
"""
# Create one valid file
valid_file = tmp_path / "valid.txt"
valid_file.write_bytes(b"valid content")

# Create URIs: one valid, one non-existent file, one invalid path
table = pa.Table.from_arrays(
[
pa.array(
[
f"local://{valid_file}",
f"local://{tmp_path}/nonexistent.txt", # File doesn't exist
"local:///this/path/does/not/exist/file.txt", # Invalid path
]
),
],
names=["uri"],
)

ds = ray.data.from_arrow(table)
ds_with_downloads = ds.with_column("bytes", download("uri"))

# Should not crash - failed downloads return None
results = ds_with_downloads.take_all()
assert len(results) == 3

# First URI should succeed
assert results[0]["bytes"] == b"valid content"

# Second and third URIs should fail gracefully (return None)
assert results[1]["bytes"] is None
assert results[2]["bytes"] is None
Comment on lines +328 to +333
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These assertion assume that the dataset produces outputs in order. If tasks complete out of order, this test will fail

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by #58518


def test_download_expression_all_size_estimations_fail(self):
"""Test download expression when all URI size estimations fail.

This tests the failed download does not cause division by zero error.
"""
# Create URIs that will fail size estimation (non-existent files)
# Using enough URIs to trigger size estimation sampling
invalid_uris = [
f"local:///nonexistent/path/file_{i}.txt"
for i in range(30) # More than INIT_SAMPLE_BATCH_SIZE (25)
]

table = pa.Table.from_arrays(
[pa.array(invalid_uris)],
names=["uri"],
)

ds = ray.data.from_arrow(table)
ds_with_downloads = ds.with_column("bytes", download("uri"))

# Should not crash with divide-by-zero error
# The PartitionActor should handle all failed size estimations gracefully
# and fall back to using the number of rows in the block as partition size
results = ds_with_downloads.take_all()

# All downloads should fail gracefully (return None)
assert len(results) == 30
for result in results:
assert result["bytes"] is None

def test_download_expression_mixed_valid_and_invalid_size_estimation(
self, tmp_path
):
Comment on lines +365 to +367
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test seems to make some assumptions about the download implementation (specifically, that it performs size estimation).

test_download_expression_with_invalid_uris also covers a similar case with a mix of invalid and valid URIs.

Given these, it might be worth reconsidering whether we need to keep this test

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Probably best to remove this test @xyuzh

"""Test download expression with mix of valid and invalid URIs for size estimation.

This tests that size estimation handles partial failures correctly.
"""
# Create some valid files
valid_files = []
for i in range(10):
file_path = tmp_path / f"valid_{i}.txt"
file_path.write_bytes(b"x" * 100) # 100 bytes each
valid_files.append(str(file_path))

# Mix valid and invalid URIs
mixed_uris = []
for i in range(30):
if i % 3 == 0 and i // 3 < len(valid_files):
# Every 3rd URI is valid (for first 10)
mixed_uris.append(f"local://{valid_files[i // 3]}")
else:
# Others are invalid
mixed_uris.append(f"local:///nonexistent/file_{i}.txt")

table = pa.Table.from_arrays(
[pa.array(mixed_uris)],
names=["uri"],
)

ds = ray.data.from_arrow(table)
ds_with_downloads = ds.with_column("bytes", download("uri"))

# Should not crash - should handle mixed valid/invalid gracefully
results = ds_with_downloads.take_all()
assert len(results) == 30

# Verify valid URIs downloaded successfully
for i, result in enumerate(results):
if i % 3 == 0 and i // 3 < len(valid_files):
assert result["bytes"] == b"x" * 100
else:
assert result["bytes"] is None


class TestDownloadExpressionIntegration:
"""Integration tests combining download expressions with other Ray Data operations."""
Expand Down