Skip to content

Commit 4c70984

Browse files
[Data] Fix file size ordering in download partitioning with multiple URI columns (#58517)
The `_sample_sizes` method was using `as_completed()` to collect file sizes, which returns results in completion order rather than submission order. This scrambled the file sizes list so it no longer corresponded to the input URI order. When multiple URI columns are used, `_estimate_nrows_per_partition` calls `zip(*sampled_file_sizes_by_column.values())` on line 284, which assumes file sizes from different columns align by row index. The scrambled ordering caused file sizes from different rows to be incorrectly combined, producing incorrect partition size estimates. ## Changes - Pre-allocate the `file_sizes` list with the correct size - Use a `future_to_file_index` mapping to track the original submission order - Place results at their correct positions regardless of completion order - Add assertion to verify list length matches expected size ## Related issues #58464 (comment) --------- Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 7498739 commit 4c70984

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

python/ray/data/_internal/planner/plan_download_op.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -326,20 +326,25 @@ def get_file_size(uri_path, fs):
326326
)
327327

328328
# Use ThreadPoolExecutor for concurrent size fetching
329-
file_sizes = []
329+
file_sizes = [None] * len(paths)
330330
with ThreadPoolExecutor(max_workers=URI_DOWNLOAD_MAX_WORKERS) as executor:
331331
# Submit all size fetch tasks
332-
futures = [
333-
executor.submit(get_file_size, uri_path, fs) for uri_path in paths
334-
]
332+
future_to_file_index = {
333+
executor.submit(get_file_size, uri_path, fs): file_index
334+
for file_index, uri_path in enumerate(paths)
335+
}
335336

336337
# Collect results as they complete (order doesn't matter)
337-
for future in as_completed(futures):
338+
for future in as_completed(future_to_file_index):
339+
file_index = future_to_file_index[future]
338340
try:
339341
size = future.result()
340-
file_sizes.append(size if size is not None else 0)
342+
file_sizes[file_index] = size if size is not None else 0
341343
except Exception as e:
342344
logger.warning(f"Error fetching file size for download: {e}")
343-
file_sizes.append(0)
345+
file_sizes[file_index] = 0
344346

347+
assert all(
348+
fs is not None for fs in file_sizes
349+
), "File size sampling did not complete for all paths"
345350
return file_sizes

0 commit comments

Comments
 (0)