From c9d91080fba732d89bf6a50e45293b85daad2b6c Mon Sep 17 00:00:00 2001 From: xyuzh Date: Fri, 7 Nov 2025 14:38:32 -0800 Subject: [PATCH 1/4] Add exception handling for URI download failures - Added try-except block in load_uri_bytes function to handle invalid URIs - Catch specific exceptions (OSError, pa.ArrowException) for better error handling - Failed downloads now return None instead of crashing the pipeline - Added warning log message for failed URI downloads Signed-off-by: xyuzh --- python/ray/data/_internal/planner/plan_download_op.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/planner/plan_download_op.py b/python/ray/data/_internal/planner/plan_download_op.py index 5f626ed54a0a..bfcced3a97d8 100644 --- a/python/ray/data/_internal/planner/plan_download_op.py +++ b/python/ray/data/_internal/planner/plan_download_op.py @@ -189,8 +189,11 @@ def download_bytes_threaded( def load_uri_bytes(uri_path_iterator): """Function that takes an iterator of URI paths and yields downloaded bytes for each.""" for uri_path in uri_path_iterator: - with fs.open_input_file(uri_path) as f: - yield f.read() + try: + with fs.open_input_file(uri_path) as f: + yield f.read() + except OSError as _: + yield None # Use make_async_gen to download URI bytes concurrently # This preserves the order of results to match the input URIs From 095973428f86c88eee42a70179223a3d31fec5b1 Mon Sep 17 00:00:00 2001 From: xyuzh Date: Sat, 8 Nov 2025 12:29:07 -0800 Subject: [PATCH 2/4] Fix divide by zero error when all URI size estimations fail - Modified _sample_sizes to append 0 for failed URLs instead of discarding them - Failed URLs (returning None or raising exceptions) now contribute 0 to size estimates - Ensures file_sizes list length matches number of sampled URIs - Allows avg_nbytes_per_row == 0 check to catch the all-failures case - Maintains empty row_sizes check for edge case when no URIs are sampled Signed-off-by: xyuzh --- python/ray/data/_internal/planner/plan_download_op.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/planner/plan_download_op.py b/python/ray/data/_internal/planner/plan_download_op.py index bfcced3a97d8..c49adc5f3d59 100644 --- a/python/ray/data/_internal/planner/plan_download_op.py +++ b/python/ray/data/_internal/planner/plan_download_op.py @@ -281,6 +281,7 @@ def _estimate_nrows_per_partition(self, block: pa.Table) -> int: ] target_nbytes_per_partition = self._data_context.target_max_block_size + avg_nbytes_per_row = sum(row_sizes) / len(row_sizes) if avg_nbytes_per_row == 0: logger.warning( @@ -325,9 +326,9 @@ def get_file_size(uri_path, fs): for future in as_completed(futures): try: size = future.result() - if size is not None: - file_sizes.append(size) + file_sizes.append(size if size is not None else 0) except Exception as e: logger.warning(f"Error fetching file size for download: {e}") + file_sizes.append(0) return file_sizes From 8fdb894ee63054da1a4bea91b7b9ff564ee0cb93 Mon Sep 17 00:00:00 2001 From: xyuzh Date: Sat, 8 Nov 2025 15:45:17 -0800 Subject: [PATCH 3/4] Add comprehensive edge case tests for URI download failures - Add test_download_expression_with_invalid_uris: Tests exception handling for URIs that fail to download (OSError), verifying None is returned instead of crashing (covers commit c9d91080fb) - Add test_download_expression_all_size_estimations_fail: Tests divide-by-zero fix when all URI size estimations fail during partitioning, ensuring fallback to row count works correctly (covers commit 095973428f) - Add test_download_expression_mixed_valid_and_invalid_size_estimation: Tests partial failures during size estimation with mix of valid/invalid URIs These tests properly cover the code paths that were previously untested: - load_uri_bytes exception handling (lines 192-196) - avg_nbytes_per_row == 0 check (lines 285-291) - Failed size estimation handling (lines 326-332) Signed-off-by: xyuzh --- .../data/tests/test_download_expression.py | 109 ++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/python/ray/data/tests/test_download_expression.py b/python/ray/data/tests/test_download_expression.py index 72533655f841..4fc04aba87ce 100644 --- a/python/ray/data/tests/test_download_expression.py +++ b/python/ray/data/tests/test_download_expression.py @@ -294,6 +294,115 @@ def test_download_expression_with_null_uris(self): # If it fails, should be a reasonable error (not a crash) assert isinstance(e, (ValueError, KeyError, RuntimeError)) + def test_download_expression_with_invalid_uris(self, tmp_path): + """Test download expression with URIs that fail to download. + + This tests the exception handling in load_uri_bytes (commit c9d91080fb) + where OSError is caught and None is returned for failed downloads. + """ + # Create one valid file + valid_file = tmp_path / "valid.txt" + valid_file.write_bytes(b"valid content") + + # Create URIs: one valid, one non-existent file, one invalid path + table = pa.Table.from_arrays( + [ + pa.array([ + f"local://{valid_file}", + f"local://{tmp_path}/nonexistent.txt", # File doesn't exist + "local:///this/path/does/not/exist/file.txt", # Invalid path + ]), + ], + names=["uri"], + ) + + ds = ray.data.from_arrow(table) + ds_with_downloads = ds.with_column("bytes", download("uri")) + + # Should not crash - failed downloads return None + results = ds_with_downloads.take_all() + assert len(results) == 3 + + # First URI should succeed + assert results[0]["bytes"] == b"valid content" + + # Second and third URIs should fail gracefully (return None) + assert results[1]["bytes"] is None + assert results[2]["bytes"] is None + + def test_download_expression_all_size_estimations_fail(self): + """Test download expression when all URI size estimations fail. + + This tests the divide-by-zero fix (commit 095973428f) where failed + size estimations append 0 instead of being skipped, and avg_nbytes_per_row == 0 + is checked to prevent division by zero. + """ + # Create URIs that will fail size estimation (non-existent files) + # Using enough URIs to trigger size estimation sampling + invalid_uris = [ + f"local:///nonexistent/path/file_{i}.txt" + for i in range(30) # More than INIT_SAMPLE_BATCH_SIZE (25) + ] + + table = pa.Table.from_arrays( + [pa.array(invalid_uris)], + names=["uri"], + ) + + ds = ray.data.from_arrow(table) + ds_with_downloads = ds.with_column("bytes", download("uri")) + + # Should not crash with divide-by-zero error + # The PartitionActor should handle all failed size estimations gracefully + # and fall back to using the number of rows in the block as partition size + results = ds_with_downloads.take_all() + + # All downloads should fail gracefully (return None) + assert len(results) == 30 + for result in results: + assert result["bytes"] is None + + def test_download_expression_mixed_valid_and_invalid_size_estimation(self, tmp_path): + """Test download expression with mix of valid and invalid URIs for size estimation. + + This tests that size estimation handles partial failures correctly. + """ + # Create some valid files + valid_files = [] + for i in range(10): + file_path = tmp_path / f"valid_{i}.txt" + file_path.write_bytes(b"x" * 100) # 100 bytes each + valid_files.append(str(file_path)) + + # Mix valid and invalid URIs + mixed_uris = [] + for i in range(30): + if i % 3 == 0 and i // 3 < len(valid_files): + # Every 3rd URI is valid (for first 10) + mixed_uris.append(f"local://{valid_files[i // 3]}") + else: + # Others are invalid + mixed_uris.append(f"local:///nonexistent/file_{i}.txt") + + table = pa.Table.from_arrays( + [pa.array(mixed_uris)], + names=["uri"], + ) + + ds = ray.data.from_arrow(table) + ds_with_downloads = ds.with_column("bytes", download("uri")) + + # Should not crash - should handle mixed valid/invalid gracefully + results = ds_with_downloads.take_all() + assert len(results) == 30 + + # Verify valid URIs downloaded successfully + for i, result in enumerate(results): + if i % 3 == 0 and i // 3 < len(valid_files): + assert result["bytes"] == b"x" * 100 + else: + assert result["bytes"] is None + class TestDownloadExpressionIntegration: """Integration tests combining download expressions with other Ray Data operations.""" From e14abd1b733ad1b81ecd486e09784e1f6f0c68c7 Mon Sep 17 00:00:00 2001 From: xyuzh Date: Sun, 9 Nov 2025 01:12:46 -0800 Subject: [PATCH 4/4] Fix trailing whitespace in test docstrings and comments Signed-off-by: xyuzh --- .../_internal/planner/plan_download_op.py | 6 ++- .../data/tests/test_download_expression.py | 46 ++++++++++--------- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/python/ray/data/_internal/planner/plan_download_op.py b/python/ray/data/_internal/planner/plan_download_op.py index c49adc5f3d59..a6cdf8e4ef55 100644 --- a/python/ray/data/_internal/planner/plan_download_op.py +++ b/python/ray/data/_internal/planner/plan_download_op.py @@ -192,7 +192,10 @@ def load_uri_bytes(uri_path_iterator): try: with fs.open_input_file(uri_path) as f: yield f.read() - except OSError as _: + except OSError as e: + logger.debug( + f"Failed to download URI '{uri_path}' from column '{uri_column_name}' with error: {e}" + ) yield None # Use make_async_gen to download URI bytes concurrently @@ -281,7 +284,6 @@ def _estimate_nrows_per_partition(self, block: pa.Table) -> int: ] target_nbytes_per_partition = self._data_context.target_max_block_size - avg_nbytes_per_row = sum(row_sizes) / len(row_sizes) if avg_nbytes_per_row == 0: logger.warning( diff --git a/python/ray/data/tests/test_download_expression.py b/python/ray/data/tests/test_download_expression.py index 4fc04aba87ce..9854dc263864 100644 --- a/python/ray/data/tests/test_download_expression.py +++ b/python/ray/data/tests/test_download_expression.py @@ -296,22 +296,24 @@ def test_download_expression_with_null_uris(self): def test_download_expression_with_invalid_uris(self, tmp_path): """Test download expression with URIs that fail to download. - - This tests the exception handling in load_uri_bytes (commit c9d91080fb) + + This tests the exception handling in load_uri_bytes where OSError is caught and None is returned for failed downloads. """ # Create one valid file valid_file = tmp_path / "valid.txt" valid_file.write_bytes(b"valid content") - + # Create URIs: one valid, one non-existent file, one invalid path table = pa.Table.from_arrays( [ - pa.array([ - f"local://{valid_file}", - f"local://{tmp_path}/nonexistent.txt", # File doesn't exist - "local:///this/path/does/not/exist/file.txt", # Invalid path - ]), + pa.array( + [ + f"local://{valid_file}", + f"local://{tmp_path}/nonexistent.txt", # File doesn't exist + "local:///this/path/does/not/exist/file.txt", # Invalid path + ] + ), ], names=["uri"], ) @@ -322,28 +324,26 @@ def test_download_expression_with_invalid_uris(self, tmp_path): # Should not crash - failed downloads return None results = ds_with_downloads.take_all() assert len(results) == 3 - + # First URI should succeed assert results[0]["bytes"] == b"valid content" - + # Second and third URIs should fail gracefully (return None) assert results[1]["bytes"] is None assert results[2]["bytes"] is None def test_download_expression_all_size_estimations_fail(self): """Test download expression when all URI size estimations fail. - - This tests the divide-by-zero fix (commit 095973428f) where failed - size estimations append 0 instead of being skipped, and avg_nbytes_per_row == 0 - is checked to prevent division by zero. + + This tests the failed download does not cause division by zero error. """ # Create URIs that will fail size estimation (non-existent files) # Using enough URIs to trigger size estimation sampling invalid_uris = [ - f"local:///nonexistent/path/file_{i}.txt" + f"local:///nonexistent/path/file_{i}.txt" for i in range(30) # More than INIT_SAMPLE_BATCH_SIZE (25) ] - + table = pa.Table.from_arrays( [pa.array(invalid_uris)], names=["uri"], @@ -356,15 +356,17 @@ def test_download_expression_all_size_estimations_fail(self): # The PartitionActor should handle all failed size estimations gracefully # and fall back to using the number of rows in the block as partition size results = ds_with_downloads.take_all() - + # All downloads should fail gracefully (return None) assert len(results) == 30 for result in results: assert result["bytes"] is None - def test_download_expression_mixed_valid_and_invalid_size_estimation(self, tmp_path): + def test_download_expression_mixed_valid_and_invalid_size_estimation( + self, tmp_path + ): """Test download expression with mix of valid and invalid URIs for size estimation. - + This tests that size estimation handles partial failures correctly. """ # Create some valid files @@ -373,7 +375,7 @@ def test_download_expression_mixed_valid_and_invalid_size_estimation(self, tmp_p file_path = tmp_path / f"valid_{i}.txt" file_path.write_bytes(b"x" * 100) # 100 bytes each valid_files.append(str(file_path)) - + # Mix valid and invalid URIs mixed_uris = [] for i in range(30): @@ -383,7 +385,7 @@ def test_download_expression_mixed_valid_and_invalid_size_estimation(self, tmp_p else: # Others are invalid mixed_uris.append(f"local:///nonexistent/file_{i}.txt") - + table = pa.Table.from_arrays( [pa.array(mixed_uris)], names=["uri"], @@ -395,7 +397,7 @@ def test_download_expression_mixed_valid_and_invalid_size_estimation(self, tmp_p # Should not crash - should handle mixed valid/invalid gracefully results = ds_with_downloads.take_all() assert len(results) == 30 - + # Verify valid URIs downloaded successfully for i, result in enumerate(results): if i % 3 == 0 and i // 3 < len(valid_files):