Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions python/ray/data/_internal/planner/plan_download_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,23 @@ def download_bytes_threaded(
def load_uri_bytes(uri_path_iterator):
"""Function that takes an iterator of URI paths and yields downloaded bytes for each."""
for uri_path in uri_path_iterator:
read_bytes = None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Malformed URIs Crash Download Operation

The call to _resolve_paths_and_filesystem(uris) isn't wrapped in exception handling, so malformed URIs (like "foo://bar" or "://no-scheme") will raise exceptions and crash the entire download operation instead of gracefully returning None for those URIs. The error handling in load_uri_bytes only catches exceptions during file reading, not during path resolution. This contradicts the test expectations in test_download_expression_with_malformed_uris which expects all malformed URIs to return None.

Fix in Cursor Fix in Web

try:
with fs.open_input_file(uri_path) as f:
yield f.read()
# Use open_input_stream to handle the rare scenario where the data source is not seekable.
with fs.open_input_stream(uri_path) as f:
read_bytes = f.read()
except OSError as e:
logger.debug(
f"Failed to download URI '{uri_path}' from column '{uri_column_name}' with error: {e}"
f"OSError reading uri '{uri_path}' for column '{uri_column_name}': {e}"
)
yield None
except Exception as e:
# Catch unexpected errors like pyarrow.lib.ArrowInvalid caused by an invalid uri like
# `foo://bar` to avoid failing because of one invalid uri.
logger.warning(
f"Unexpected error reading uri '{uri_path}' for column '{uri_column_name}': {e}"
)
finally:
yield read_bytes

# Use make_async_gen to download URI bytes concurrently
# This preserves the order of results to match the input URIs
Expand Down
54 changes: 23 additions & 31 deletions python/ray/data/tests/test_download_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,43 +294,35 @@ def test_download_expression_with_null_uris(self):
# If it fails, should be a reasonable error (not a crash)
assert isinstance(e, (ValueError, KeyError, RuntimeError))

def test_download_expression_with_invalid_uris(self, tmp_path):
"""Test download expression with URIs that fail to download.
def test_download_expression_with_malformed_uris(self, tmp_path):
"""Test download expression with malformed URIs.

This tests the exception handling in load_uri_bytes
where OSError is caught and None is returned for failed downloads.
This tests that various malformed URIs are caught and return None
instead of crashing.
"""
# Create one valid file
valid_file = tmp_path / "valid.txt"
valid_file.write_bytes(b"valid content")

# Create URIs: one valid, one non-existent file, one invalid path
table = pa.Table.from_arrays(
[
pa.array(
[
f"local://{valid_file}",
f"local://{tmp_path}/nonexistent.txt", # File doesn't exist
"local:///this/path/does/not/exist/file.txt", # Invalid path
]
),
],
names=["uri"],
)
malformed_uris = [
f"local://{tmp_path}/nonexistent.txt", # File doesn't exist
"local:///this/path/does/not/exist/file.txt", # Invalid path
"", # Empty URI
"foobar", # Random string
# TODO(xyuzh): Currently, using the below URIs raises an exception
# in _resolve_paths_and_filesystem. We need to fix that issue and
# add the tests in.
# "file:///\x00/null/byte", # Null byte
# "http://host/path\n\r", # Line breaks
# "foo://bar", # Invalid scheme
# "://no-scheme", # Missing scheme
# "http://host/path?query=<script>", # Injection attempts
]

ds = ray.data.from_arrow(table)
ds = ray.data.from_items([{"uri": uri} for uri in malformed_uris])
ds_with_downloads = ds.with_column("bytes", download("uri"))

# Should not crash - failed downloads return None
results = ds_with_downloads.take_all()
assert len(results) == 3

# First URI should succeed
assert results[0]["bytes"] == b"valid content"

# Second and third URIs should fail gracefully (return None)
assert results[1]["bytes"] is None
assert results[2]["bytes"] is None
# All malformed URIs should return None
assert len(results) == len(malformed_uris)
for result in results:
assert result["bytes"] is None

def test_download_expression_all_size_estimations_fail(self):
"""Test download expression when all URI size estimations fail.
Expand Down