diff --git a/python/ray/data/_internal/planner/plan_download_op.py b/python/ray/data/_internal/planner/plan_download_op.py index 5f626ed54a0a..a6cdf8e4ef55 100644 --- a/python/ray/data/_internal/planner/plan_download_op.py +++ b/python/ray/data/_internal/planner/plan_download_op.py @@ -189,8 +189,14 @@ def download_bytes_threaded( def load_uri_bytes(uri_path_iterator): """Function that takes an iterator of URI paths and yields downloaded bytes for each.""" for uri_path in uri_path_iterator: - with fs.open_input_file(uri_path) as f: - yield f.read() + try: + with fs.open_input_file(uri_path) as f: + yield f.read() + except OSError as e: + logger.debug( + f"Failed to download URI '{uri_path}' from column '{uri_column_name}' with error: {e}" + ) + yield None # Use make_async_gen to download URI bytes concurrently # This preserves the order of results to match the input URIs @@ -322,9 +328,9 @@ def get_file_size(uri_path, fs): for future in as_completed(futures): try: size = future.result() - if size is not None: - file_sizes.append(size) + file_sizes.append(size if size is not None else 0) except Exception as e: logger.warning(f"Error fetching file size for download: {e}") + file_sizes.append(0) return file_sizes diff --git a/python/ray/data/tests/test_download_expression.py b/python/ray/data/tests/test_download_expression.py index 72533655f841..9854dc263864 100644 --- a/python/ray/data/tests/test_download_expression.py +++ b/python/ray/data/tests/test_download_expression.py @@ -294,6 +294,117 @@ def test_download_expression_with_null_uris(self): # If it fails, should be a reasonable error (not a crash) assert isinstance(e, (ValueError, KeyError, RuntimeError)) + def test_download_expression_with_invalid_uris(self, tmp_path): + """Test download expression with URIs that fail to download. + + This tests the exception handling in load_uri_bytes + where OSError is caught and None is returned for failed downloads. + """ + # Create one valid file + valid_file = tmp_path / "valid.txt" + valid_file.write_bytes(b"valid content") + + # Create URIs: one valid, one non-existent file, one invalid path + table = pa.Table.from_arrays( + [ + pa.array( + [ + f"local://{valid_file}", + f"local://{tmp_path}/nonexistent.txt", # File doesn't exist + "local:///this/path/does/not/exist/file.txt", # Invalid path + ] + ), + ], + names=["uri"], + ) + + ds = ray.data.from_arrow(table) + ds_with_downloads = ds.with_column("bytes", download("uri")) + + # Should not crash - failed downloads return None + results = ds_with_downloads.take_all() + assert len(results) == 3 + + # First URI should succeed + assert results[0]["bytes"] == b"valid content" + + # Second and third URIs should fail gracefully (return None) + assert results[1]["bytes"] is None + assert results[2]["bytes"] is None + + def test_download_expression_all_size_estimations_fail(self): + """Test download expression when all URI size estimations fail. + + This tests the failed download does not cause division by zero error. + """ + # Create URIs that will fail size estimation (non-existent files) + # Using enough URIs to trigger size estimation sampling + invalid_uris = [ + f"local:///nonexistent/path/file_{i}.txt" + for i in range(30) # More than INIT_SAMPLE_BATCH_SIZE (25) + ] + + table = pa.Table.from_arrays( + [pa.array(invalid_uris)], + names=["uri"], + ) + + ds = ray.data.from_arrow(table) + ds_with_downloads = ds.with_column("bytes", download("uri")) + + # Should not crash with divide-by-zero error + # The PartitionActor should handle all failed size estimations gracefully + # and fall back to using the number of rows in the block as partition size + results = ds_with_downloads.take_all() + + # All downloads should fail gracefully (return None) + assert len(results) == 30 + for result in results: + assert result["bytes"] is None + + def test_download_expression_mixed_valid_and_invalid_size_estimation( + self, tmp_path + ): + """Test download expression with mix of valid and invalid URIs for size estimation. + + This tests that size estimation handles partial failures correctly. + """ + # Create some valid files + valid_files = [] + for i in range(10): + file_path = tmp_path / f"valid_{i}.txt" + file_path.write_bytes(b"x" * 100) # 100 bytes each + valid_files.append(str(file_path)) + + # Mix valid and invalid URIs + mixed_uris = [] + for i in range(30): + if i % 3 == 0 and i // 3 < len(valid_files): + # Every 3rd URI is valid (for first 10) + mixed_uris.append(f"local://{valid_files[i // 3]}") + else: + # Others are invalid + mixed_uris.append(f"local:///nonexistent/file_{i}.txt") + + table = pa.Table.from_arrays( + [pa.array(mixed_uris)], + names=["uri"], + ) + + ds = ray.data.from_arrow(table) + ds_with_downloads = ds.with_column("bytes", download("uri")) + + # Should not crash - should handle mixed valid/invalid gracefully + results = ds_with_downloads.take_all() + assert len(results) == 30 + + # Verify valid URIs downloaded successfully + for i, result in enumerate(results): + if i % 3 == 0 and i // 3 < len(valid_files): + assert result["bytes"] == b"x" * 100 + else: + assert result["bytes"] is None + class TestDownloadExpressionIntegration: """Integration tests combining download expressions with other Ray Data operations."""