diff --git a/python/ray/data/_internal/planner/plan_download_op.py b/python/ray/data/_internal/planner/plan_download_op.py index a6cdf8e4ef55..c585eb4ddcce 100644 --- a/python/ray/data/_internal/planner/plan_download_op.py +++ b/python/ray/data/_internal/planner/plan_download_op.py @@ -189,14 +189,23 @@ def download_bytes_threaded( def load_uri_bytes(uri_path_iterator): """Function that takes an iterator of URI paths and yields downloaded bytes for each.""" for uri_path in uri_path_iterator: + read_bytes = None try: - with fs.open_input_file(uri_path) as f: - yield f.read() + # Use open_input_stream to handle the rare scenario where the data source is not seekable. + with fs.open_input_stream(uri_path) as f: + read_bytes = f.read() except OSError as e: logger.debug( - f"Failed to download URI '{uri_path}' from column '{uri_column_name}' with error: {e}" + f"OSError reading uri '{uri_path}' for column '{uri_column_name}': {e}" ) - yield None + except Exception as e: + # Catch unexpected errors like pyarrow.lib.ArrowInvalid caused by an invalid uri like + # `foo://bar` to avoid failing because of one invalid uri. + logger.warning( + f"Unexpected error reading uri '{uri_path}' for column '{uri_column_name}': {e}" + ) + finally: + yield read_bytes # Use make_async_gen to download URI bytes concurrently # This preserves the order of results to match the input URIs diff --git a/python/ray/data/tests/test_download_expression.py b/python/ray/data/tests/test_download_expression.py index 9854dc263864..cf6f9eaedb43 100644 --- a/python/ray/data/tests/test_download_expression.py +++ b/python/ray/data/tests/test_download_expression.py @@ -294,43 +294,35 @@ def test_download_expression_with_null_uris(self): # If it fails, should be a reasonable error (not a crash) assert isinstance(e, (ValueError, KeyError, RuntimeError)) - def test_download_expression_with_invalid_uris(self, tmp_path): - """Test download expression with URIs that fail to download. + def test_download_expression_with_malformed_uris(self, tmp_path): + """Test download expression with malformed URIs. - This tests the exception handling in load_uri_bytes - where OSError is caught and None is returned for failed downloads. + This tests that various malformed URIs are caught and return None + instead of crashing. """ - # Create one valid file - valid_file = tmp_path / "valid.txt" - valid_file.write_bytes(b"valid content") - - # Create URIs: one valid, one non-existent file, one invalid path - table = pa.Table.from_arrays( - [ - pa.array( - [ - f"local://{valid_file}", - f"local://{tmp_path}/nonexistent.txt", # File doesn't exist - "local:///this/path/does/not/exist/file.txt", # Invalid path - ] - ), - ], - names=["uri"], - ) + malformed_uris = [ + f"local://{tmp_path}/nonexistent.txt", # File doesn't exist + "local:///this/path/does/not/exist/file.txt", # Invalid path + "", # Empty URI + "foobar", # Random string + # TODO(xyuzh): Currently, using the below URIs raises an exception + # in _resolve_paths_and_filesystem. We need to fix that issue and + # add the tests in. + # "file:///\x00/null/byte", # Null byte + # "http://host/path\n\r", # Line breaks + # "foo://bar", # Invalid scheme + # "://no-scheme", # Missing scheme + # "http://host/path?query=