Skip to content

Commit dde70e7

Browse files
[Data] Fix HTTP streaming file download by using open_input_stream (#58542)
## What does this PR do? Fixes HTTP streaming file downloads in Ray Data's download operation. Some URIs (especially HTTP streams) require `open_input_stream` instead of `open_input_file`. ## Changes - Modified `download_bytes_threaded` in `plan_download_op.py` to try both `open_input_file` and `open_input_stream` for each URI - Improved error handling to distinguish between different error types - Failed downloads now return `None` gracefully instead of crashing ## Testing ``` import pyarrow as pa from ray.data.context import DataContext from ray.data._internal.planner.plan_download_op import download_bytes_threaded # Test URLs: one valid, one 404 urls = [ "https://static-assets.tesla.com/configurator/compositor?context=design_studio_2?&bkba_opt=1&view=STUD_3QTR&size=600&model=my&options=$APBS,$IPB7,$PPSW,$SC04,$MDLY,$WY19P,$MTY46,$STY5S,$CPF0,$DRRH&crop=1150,647,390,180&", ] # Create PyArrow table and call download function table = pa.table({"url": urls}) ctx = DataContext.get_current() results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx)) # Check results result_table = results[0] for i in range(result_table.num_rows): url = result_table['url'][i].as_py() bytes_data = result_table['bytes'][i].as_py() if bytes_data is None: print(f"Row {i}: FAILED (None) - try-catch worked ✓") else: print(f"Row {i}: SUCCESS ({len(bytes_data)} bytes)") print(f" URL: {url[:60]}...") print("\n✅ Test passed: Failed downloads return None instead of crashing.") ``` Before the fix: ``` TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/ray/default/test_streaming_fallback.py", line 110, in <module> test_download_expression_with_streaming_fallback() File "/home/ray/default/test_streaming_fallback.py", line 67, in test_download_expression_with_streaming_fallback with patch.object(pafs.FileSystem, "open_input_file", mock_open_input_file): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1594, in __enter__ if not self.__exit__(*sys.exc_info()): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/unittest/mock.py", line 1603, in __exit__ setattr(self.target, self.attribute, self.temp_original) TypeError: cannot set 'open_input_file' attribute of immutable type 'pyarrow._fs.FileSystem' (base) ray@ip-10-0-39-21:~/default$ python test.py 2025-11-11 18:32:23,510 WARNING util.py:1059 -- Caught exception in transforming worker! Traceback (most recent call last): File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker for result in fn(input_queue_iter): ^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes yield f.read() ^^^^^^^^ File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek raise ValueError("Cannot seek streaming HTTP file") ValueError: Cannot seek streaming HTTP file Traceback (most recent call last): File "/home/ray/default/test.py", line 16, in <module> results = list(download_bytes_threaded(table, ["url"], ["bytes"], ctx)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 207, in download_bytes_threaded uri_bytes = list( ^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1113, in make_async_gen raise item File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/util.py", line 1048, in _run_transforming_worker for result in fn(input_queue_iter): ^^^^^^^^^^^^^^^^^^^^ File "/home/ray/anaconda3/lib/python3.12/site-packages/ray/data/_internal/planner/plan_download_op.py", line 197, in load_uri_bytes yield f.read() ^^^^^^^^ File "pyarrow/io.pxi", line 411, in pyarrow.lib.NativeFile.read File "pyarrow/io.pxi", line 263, in pyarrow.lib.NativeFile.size File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status File "/home/ray/anaconda3/lib/python3.12/site-packages/fsspec/implementations/http.py", line 743, in seek raise ValueError("Cannot seek streaming HTTP file") ValueError: Cannot seek streaming HTTP file ``` After the fix: ``` Row 0: SUCCESS (189370 bytes) URL: https://static-assets.tesla.com/configurator/compositor?cont... ``` Tested with HTTP streaming URLs (e.g., Tesla configurator images) that previously failed: - ✅ Successfully downloads HTTP stream files - ✅ Gracefully handles failed downloads (returns None) - ✅ Maintains backward compatibility with existing file downloads --------- Signed-off-by: xyuzh <xinyzng@gmail.com> Signed-off-by: Robert Nishihara <robertnishihara@gmail.com> Co-authored-by: Robert Nishihara <robertnishihara@gmail.com>
1 parent 438d6dc commit dde70e7

File tree

2 files changed

+36
-35
lines changed

2 files changed

+36
-35
lines changed

python/ray/data/_internal/planner/plan_download_op.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,14 +189,23 @@ def download_bytes_threaded(
189189
def load_uri_bytes(uri_path_iterator):
190190
"""Function that takes an iterator of URI paths and yields downloaded bytes for each."""
191191
for uri_path in uri_path_iterator:
192+
read_bytes = None
192193
try:
193-
with fs.open_input_file(uri_path) as f:
194-
yield f.read()
194+
# Use open_input_stream to handle the rare scenario where the data source is not seekable.
195+
with fs.open_input_stream(uri_path) as f:
196+
read_bytes = f.read()
195197
except OSError as e:
196198
logger.debug(
197-
f"Failed to download URI '{uri_path}' from column '{uri_column_name}' with error: {e}"
199+
f"OSError reading uri '{uri_path}' for column '{uri_column_name}': {e}"
198200
)
199-
yield None
201+
except Exception as e:
202+
# Catch unexpected errors like pyarrow.lib.ArrowInvalid caused by an invalid uri like
203+
# `foo://bar` to avoid failing because of one invalid uri.
204+
logger.warning(
205+
f"Unexpected error reading uri '{uri_path}' for column '{uri_column_name}': {e}"
206+
)
207+
finally:
208+
yield read_bytes
200209

201210
# Use make_async_gen to download URI bytes concurrently
202211
# This preserves the order of results to match the input URIs

python/ray/data/tests/test_download_expression.py

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -294,43 +294,35 @@ def test_download_expression_with_null_uris(self):
294294
# If it fails, should be a reasonable error (not a crash)
295295
assert isinstance(e, (ValueError, KeyError, RuntimeError))
296296

297-
def test_download_expression_with_invalid_uris(self, tmp_path):
298-
"""Test download expression with URIs that fail to download.
297+
def test_download_expression_with_malformed_uris(self, tmp_path):
298+
"""Test download expression with malformed URIs.
299299
300-
This tests the exception handling in load_uri_bytes
301-
where OSError is caught and None is returned for failed downloads.
300+
This tests that various malformed URIs are caught and return None
301+
instead of crashing.
302302
"""
303-
# Create one valid file
304-
valid_file = tmp_path / "valid.txt"
305-
valid_file.write_bytes(b"valid content")
306-
307-
# Create URIs: one valid, one non-existent file, one invalid path
308-
table = pa.Table.from_arrays(
309-
[
310-
pa.array(
311-
[
312-
f"local://{valid_file}",
313-
f"local://{tmp_path}/nonexistent.txt", # File doesn't exist
314-
"local:///this/path/does/not/exist/file.txt", # Invalid path
315-
]
316-
),
317-
],
318-
names=["uri"],
319-
)
303+
malformed_uris = [
304+
f"local://{tmp_path}/nonexistent.txt", # File doesn't exist
305+
"local:///this/path/does/not/exist/file.txt", # Invalid path
306+
"", # Empty URI
307+
"foobar", # Random string
308+
# TODO(xyuzh): Currently, using the below URIs raises an exception
309+
# in _resolve_paths_and_filesystem. We need to fix that issue and
310+
# add the tests in.
311+
# "file:///\x00/null/byte", # Null byte
312+
# "http://host/path\n\r", # Line breaks
313+
# "foo://bar", # Invalid scheme
314+
# "://no-scheme", # Missing scheme
315+
# "http://host/path?query=<script>", # Injection attempts
316+
]
320317

321-
ds = ray.data.from_arrow(table)
318+
ds = ray.data.from_items([{"uri": uri} for uri in malformed_uris])
322319
ds_with_downloads = ds.with_column("bytes", download("uri"))
323-
324-
# Should not crash - failed downloads return None
325320
results = ds_with_downloads.take_all()
326-
assert len(results) == 3
327321

328-
# First URI should succeed
329-
assert results[0]["bytes"] == b"valid content"
330-
331-
# Second and third URIs should fail gracefully (return None)
332-
assert results[1]["bytes"] is None
333-
assert results[2]["bytes"] is None
322+
# All malformed URIs should return None
323+
assert len(results) == len(malformed_uris)
324+
for result in results:
325+
assert result["bytes"] is None
334326

335327
def test_download_expression_all_size_estimations_fail(self):
336328
"""Test download expression when all URI size estimations fail.

0 commit comments

Comments
 (0)