Skip to content

Conversation

@srinathk10
Copy link
Contributor

@srinathk10 srinathk10 commented Nov 15, 2025

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

[Data] Handle prefetches buffering in iter_batches

Requirements

  • Consumer of iter_batches requires predictable latency on batches, though overall Data Pipeline is optimized for maximizing throughput and utilization.
  • Essentially prefetching is used by the Consumer to impedance match Latency requirement with the Throughput optimized datapipeline.

Issue

  • Queuing/buffering was not set up in _iter_batches to match prefetching.
  • Multiple Workers were set up for _format_in_threadpool as f(prefetch_count) which adds to latency variance on _iter_batches.

Fix

  • In _iter_batches, set up queue depth for buffering in make_async_gen to honor prefetching.
  • In _format_in_threadpool, restrict to maximum of 4 workers by default, so as to optimize for iter_batches latency.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@srinathk10 srinathk10 force-pushed the srinathk10/format_batches branch from d3bfaf0 to 35dba95 Compare November 16, 2025 02:36
@srinathk10 srinathk10 changed the title [Data] Use 1-threadpool for format + collate in iter_baches [Data] Handle prefetches buffering in iter_baches Nov 16, 2025
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 force-pushed the srinathk10/format_batches branch from 71ef8c7 to 62ada1d Compare November 16, 2025 02:42
@srinathk10
Copy link
Contributor Author

srinathk10 commented Nov 16, 2025

Microbenchmark for prefetches in iter_batches

  • With Before, there is a lot of variance as prefetch size increases with as much as 60msecs for prefetch size=8.
  • With After, variance reduces significantly to 10msecs for prefetch size=8.

Before

/home/ray/anaconda3/lib/python3.12/site-packages/ray/anyscale/data/api/read_api.py:95: UserWarning: Parameter 'override_num_blocks' isn't supported on Ray runtime. Falling back to OSS implementation.
  warnings.warn(
2025-11-15 18:56:08,346 INFO worker.py:1832 -- Connecting to existing Ray cluster at address: 10.0.146.216:6379...
2025-11-15 18:56:08,357 INFO worker.py:2003 -- Connected to Ray cluster. View the dashboard at https://session-tb5w5gjacurgjbkib32skq4gji.i.anyscaleuserdata-staging.com 
2025-11-15 18:56:08,358 INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_4ed8c9b3766f084f16b2ab7ae712545ef25eb2af.zip' (0.02MiB) to Ray cluster...
2025-11-15 18:56:08,359 INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_4ed8c9b3766f084f16b2ab7ae712545ef25eb2af.zip'.
/home/ray/anaconda3/lib/python3.12/site-packages/ray/_private/worker.py:2051: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
Parquet dataset sampling 0: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1.00/1.00 [00:02<00:00, 2.04s/ file]
2025-11-15 18:56:10,481 INFO parquet_datasource.py:787 -- Estimated parquet encoding ratio is 1.026.
2025-11-15 18:56:10,481 INFO parquet_datasource.py:847 -- Estimated parquet reader batch size at 2251 rows
2025-11-15 18:56:10,527 INFO streaming_executor.py:85 -- A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True`.
2025-11-15 18:56:10,527 INFO logging.py:397 -- Registered dataset logger for dataset dataset_22_0
2025-11-15 18:56:10,542 INFO streaming_executor.py:170 -- Starting execution of Dataset dataset_22_0. Full logs are in /tmp/ray/session_2025-11-15_18-27-18_175587_2558/logs/ray-data
2025-11-15 18:56:10,542 INFO streaming_executor.py:171 -- Execution plan of Dataset dataset_22_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet]
Running 0: 0.00 row [00:00, ? row/s]                       2025-11-15 18:56:10,547      WARNING resource_manager.py:134 -- ⚠️  Ray's object store is configured to use only 27.7% of available memory (17.7GiB out of 64.0GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.
Running Dataset: dataset_22_0. Active & requested resources: 1/16 CPU, 256.0MB/13.3GB object store: : 0.00 row [00:01, ? row/s]2025-11-15 18:56:12,285  INFO streaming_executor.py:298 -- ✔️  Dataset dataset_22_0 execution finished in 1.74 seconds
✔️  Dataset dataset_22_0 execution finished in 1.74 seconds: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5.00k/5.00k [00:01<00:00, 2.87k row/s] 
- ReadParquet->SplitBlocks(20): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 7.1MB object store: : 5.00k row [00:01, 2.87k row/s]
prefetch_batches=0: 9.28ms                                                                                                                     
prefetch_batches=0: 6.89ms
prefetch_batches=0: 6.67ms
prefetch_batches=0: 6.99ms
prefetch_batches=0: 4.44ms
prefetch_batches=0: 4.50ms
prefetch_batches=0: 4.40ms
prefetch_batches=0: 4.38ms
prefetch_batches=0: 4.35ms
prefetch_batches=0: 4.18ms
prefetch_batches=0: 4.04ms
prefetch_batches=0: 4.12ms
prefetch_batches=0: 4.09ms
prefetch_batches=0: 4.10ms
prefetch_batches=0: 3.96ms
prefetch_batches=0: 4.06ms
prefetch_batches=0: 4.06ms
prefetch_batches=0: 4.04ms
prefetch_batches=0: 4.16ms
prefetch_batches=0: 4.22ms
prefetch_batches=2: 37.32ms
prefetch_batches=2: 0.04ms
prefetch_batches=2: 13.71ms
prefetch_batches=2: 0.04ms
prefetch_batches=2: 11.51ms
prefetch_batches=2: 0.03ms
prefetch_batches=2: 11.87ms
prefetch_batches=2: 0.06ms
prefetch_batches=2: 11.25ms
prefetch_batches=2: 0.03ms
prefetch_batches=2: 14.57ms
prefetch_batches=2: 0.03ms
prefetch_batches=2: 12.78ms
prefetch_batches=2: 0.03ms
prefetch_batches=2: 10.38ms
prefetch_batches=2: 0.03ms
prefetch_batches=2: 10.83ms
prefetch_batches=2: 0.03ms
prefetch_batches=2: 11.22ms
prefetch_batches=2: 0.03ms
prefetch_batches=4: 123.74ms
prefetch_batches=4: 4.90ms
prefetch_batches=4: 4.96ms
prefetch_batches=4: 1.19ms
prefetch_batches=4: 69.96ms
prefetch_batches=4: 8.24ms
prefetch_batches=4: 7.52ms
prefetch_batches=4: 0.16ms
prefetch_batches=4: 46.63ms
prefetch_batches=4: 9.43ms
prefetch_batches=4: 0.27ms
prefetch_batches=4: 0.05ms
prefetch_batches=4: 54.92ms
prefetch_batches=4: 0.05ms
prefetch_batches=4: 0.21ms
prefetch_batches=4: 0.02ms
prefetch_batches=4: 52.82ms
prefetch_batches=4: 0.05ms
prefetch_batches=4: 0.20ms
prefetch_batches=4: 0.15ms
prefetch_batches=8: 237.99ms
prefetch_batches=8: 19.37ms
prefetch_batches=8: 1.17ms
prefetch_batches=8: 2.45ms
prefetch_batches=8: 6.13ms
prefetch_batches=8: 8.04ms
prefetch_batches=8: 4.93ms
prefetch_batches=8: 16.04ms
prefetch_batches=8: 132.75ms
prefetch_batches=8: 34.61ms
prefetch_batches=8: 1.12ms
prefetch_batches=8: 0.03ms
prefetch_batches=8: 2.52ms
prefetch_batches=8: 0.86ms
prefetch_batches=8: 1.58ms
prefetch_batches=8: 4.64ms
2025-11-15 18:56:15,236 INFO util.py:257 -- Exiting prefetcher's background thread
prefetch_batches=8: 95.34ms
prefetch_batches=8: 25.30ms
prefetch_batches=8: 3.38ms
prefetch_batches=8: 2.09ms

============================================================
Timing Distribution Summary by prefetch_batches:
============================================================

prefetch_batches=0:
count    20.000000
mean      4.846439
std       1.432037
min       3.960929
25%       4.082277
50%       4.199040
75%       4.457732
max       9.276609
Name: timing_ms, dtype: float64

prefetch_batches=2:
count    20.000000
mean      7.290564
std       9.302255
min       0.030040
25%       0.033021
50%       5.222020
75%      11.600974
max      37.317027
Name: timing_ms, dtype: float64

prefetch_batches=4:
count     20.000000
mean      19.272903
std       33.261985
min        0.020769
25%        0.157488
50%        3.043183
75%       18.726464
max      123.735527
Name: timing_ms, dtype: float64

prefetch_batches=8:
count     20.000000
mean      30.015595
std       59.816607
min        0.025537
25%        1.965611
50%        4.780599
75%       20.852626
max      237.987233
Name: timing_ms, dtype: float64

real    0m10.006s
user    0m5.465s
sys     0m3.032s

After

/home/ray/anaconda3/lib/python3.12/site-packages/ray/anyscale/data/api/read_api.py:95: UserWarning: Parameter 'override_num_blocks' isn't supported on Ray runtime. Falling back to OSS implementation.
  warnings.warn(
2025-11-15 19:03:46,960 INFO worker.py:1832 -- Connecting to existing Ray cluster at address: 10.0.146.216:6379...
2025-11-15 19:03:46,971 INFO worker.py:2003 -- Connected to Ray cluster. View the dashboard at https://session-tb5w5gjacurgjbkib32skq4gji.i.anyscaleuserdata-staging.com 
2025-11-15 19:03:46,972 INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_4ed8c9b3766f084f16b2ab7ae712545ef25eb2af.zip' (0.02MiB) to Ray cluster...
2025-11-15 19:03:46,973 INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_4ed8c9b3766f084f16b2ab7ae712545ef25eb2af.zip'.
/home/ray/anaconda3/lib/python3.12/site-packages/ray/_private/worker.py:2051: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
Parquet dataset sampling 0: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1.00/1.00 [00:02<00:00, 2.02s/ file]
2025-11-15 19:03:49,072 INFO parquet_datasource.py:787 -- Estimated parquet encoding ratio is 1.026.
2025-11-15 19:03:49,072 INFO parquet_datasource.py:847 -- Estimated parquet reader batch size at 2251 rows
2025-11-15 19:03:49,117 INFO streaming_executor.py:85 -- A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True`.
2025-11-15 19:03:49,117 INFO logging.py:397 -- Registered dataset logger for dataset dataset_34_0
2025-11-15 19:03:49,131 INFO streaming_executor.py:170 -- Starting execution of Dataset dataset_34_0. Full logs are in /tmp/ray/session_2025-11-15_18-27-18_175587_2558/logs/ray-data
2025-11-15 19:03:49,131 INFO streaming_executor.py:171 -- Execution plan of Dataset dataset_34_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet]
Running 0: 0.00 row [00:00, ? row/s]                       2025-11-15 19:03:49,135      WARNING resource_manager.py:134 -- ⚠️  Ray's object store is configured to use only 27.7% of available memory (17.7GiB out of 64.0GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.
Running Dataset: dataset_34_0. Active & requested resources: 1/16 CPU, 256.0MB/13.3GB object store: : 0.00 row [00:01, ? row/s]2025-11-15 19:03:50,807  INFO streaming_executor.py:298 -- ✔️  Dataset dataset_34_0 execution finished in 1.68 seconds
✔️  Dataset dataset_34_0 execution finished in 1.68 seconds: : 5.00k row [00:01, 2.99k row/s]                                  0:01, ? row/s]
- ReadParquet->SplitBlocks(20): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: : 5.00k row [00:01, 2.99k row/s]
prefetch_batches=0: 13.22ms                                                                                                                   
prefetch_batches=0: 11.78ms
prefetch_batches=0: 11.94ms
prefetch_batches=0: 11.40ms
prefetch_batches=0: 11.90ms
prefetch_batches=0: 11.91ms
prefetch_batches=0: 11.53ms
prefetch_batches=0: 11.21ms
prefetch_batches=0: 11.37ms
prefetch_batches=0: 12.06ms
prefetch_batches=0: 11.58ms
prefetch_batches=0: 11.53ms
prefetch_batches=0: 11.43ms
prefetch_batches=0: 11.91ms
prefetch_batches=0: 11.73ms
prefetch_batches=0: 12.03ms
prefetch_batches=0: 11.77ms
prefetch_batches=0: 11.17ms
prefetch_batches=0: 11.71ms
prefetch_batches=0: 11.77ms
prefetch_batches=2: 47.63ms
prefetch_batches=2: 13.74ms
prefetch_batches=2: 12.59ms
prefetch_batches=2: 12.54ms
prefetch_batches=2: 12.95ms
prefetch_batches=2: 13.55ms
prefetch_batches=2: 13.22ms
prefetch_batches=2: 13.35ms
prefetch_batches=2: 12.17ms
prefetch_batches=2: 13.43ms
prefetch_batches=2: 12.13ms
prefetch_batches=2: 12.13ms
prefetch_batches=2: 12.98ms
prefetch_batches=2: 13.07ms
prefetch_batches=2: 11.20ms
prefetch_batches=2: 14.86ms
prefetch_batches=2: 13.25ms
prefetch_batches=2: 12.18ms
prefetch_batches=2: 12.63ms
prefetch_batches=2: 13.51ms
prefetch_batches=4: 54.66ms
prefetch_batches=4: 29.28ms
prefetch_batches=4: 15.71ms
prefetch_batches=4: 12.95ms
prefetch_batches=4: 13.62ms
prefetch_batches=4: 10.77ms
prefetch_batches=4: 17.96ms
prefetch_batches=4: 12.18ms
prefetch_batches=4: 14.44ms
prefetch_batches=4: 13.49ms
prefetch_batches=4: 13.39ms
prefetch_batches=4: 13.58ms
prefetch_batches=4: 13.63ms
prefetch_batches=4: 12.47ms
prefetch_batches=4: 13.40ms
prefetch_batches=4: 13.29ms
prefetch_batches=4: 13.20ms
prefetch_batches=4: 12.88ms
prefetch_batches=4: 12.34ms
prefetch_batches=4: 13.22ms
prefetch_batches=8: 60.16ms
prefetch_batches=8: 29.96ms
prefetch_batches=8: 13.67ms
prefetch_batches=8: 11.87ms
prefetch_batches=8: 15.57ms
prefetch_batches=8: 12.09ms
prefetch_batches=8: 13.57ms
prefetch_batches=8: 15.19ms
prefetch_batches=8: 12.83ms
prefetch_batches=8: 14.75ms
prefetch_batches=8: 13.58ms
prefetch_batches=8: 14.05ms
prefetch_batches=8: 13.52ms
prefetch_batches=8: 12.99ms
prefetch_batches=8: 15.36ms
prefetch_batches=8: 10.97ms
prefetch_batches=8: 15.46ms
prefetch_batches=8: 12.56ms
prefetch_batches=8: 12.92ms
prefetch_batches=8: 14.00ms

============================================================
Timing Distribution Summary by prefetch_batches:
============================================================

prefetch_batches=0:
count    20.000000
mean     11.747286
std       0.433806
min      11.168863
25%      11.504993
50%      11.746555
75%      11.909046
max      13.221121
Name: timing_ms, dtype: float64

prefetch_batches=2:
count    20.000000
mean     14.656546
std       7.800214
min      11.203704
25%      12.446366
50%      13.025708
75%      13.451453
max      47.630631
Name: timing_ms, dtype: float64

prefetch_batches=4:
count    20.000000
mean     16.322730
std       9.792941
min      10.768381
25%      12.928675
50%      13.397154
75%      13.829613
max      54.659799
Name: timing_ms, dtype: float64

prefetch_batches=8:
count    20.000000
mean     16.753312
std      10.920150
min      10.967700
25%      12.899461
50%      13.621491
75%      15.230637
max      60.155892
Name: timing_ms, dtype: float64
2025-11-15 19:03:53,988 INFO util.py:257 -- Exiting prefetcher's background thread

real    0m9.859s
user    0m5.576s
sys     0m2.430s

Benchmark script

import ray
import time
import pandas as pd
ds = ray.data.read_parquet(
    "s3://ps-airbnb/benchmarking/training/data/synthetic_parquet/rgs_500/d=2024-02-12/",
    override_num_blocks=40,
    columns=[
        "pk",
        "lf",
        "li",
        "qf",
        "s",
        "qi",
        "v",
        "m",
        "uca",
        "lk",
        "lb",
        "trsct",
        "qn",
        "ftbl",
        "sr",
        "pf",
        "ei",
        "kw",
        "cb",
    ],
).materialize()
prefetch_values = [0, 2, 4, 8]
results = []
for prefetch_batches in prefetch_values:
    iterator = iter(ds.iter_torch_batches(batch_size=None, prefetch_batches=prefetch_batches))
    
    timings = []
    for _ in range(20):
        start = time.perf_counter()
        batch = next(iterator)
        end = time.perf_counter()
        timing_ms = (end - start) * 1000
        timings.append(timing_ms)
        print(f"prefetch_batches={prefetch_batches}: {timing_ms:.2f}ms")
    
    results.append({
        'prefetch_batches': prefetch_batches,
        'timings': timings
    })
df = pd.DataFrame([
    {'prefetch_batches': r['prefetch_batches'], 'timing_ms': timing}
    for r in results
    for timing in r['timings']
])
print("\n" + "="*60)
print("Timing Distribution Summary by prefetch_batches:")
print("="*60)
for prefetch_batches in prefetch_values:
    subset = df[df['prefetch_batches'] == prefetch_batches]['timing_ms']
    print(f"\nprefetch_batches={prefetch_batches}:")
    print(subset.describe())

srinathk10 and others added 3 commits November 16, 2025 03:03
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 added the go add ONLY when ready to merge, run all tests label Nov 16, 2025
srinathk10 and others added 2 commits November 15, 2025 22:06
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 marked this pull request as ready for review November 17, 2025 18:44
@srinathk10 srinathk10 requested review from a team as code owners November 17, 2025 18:44
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Nov 17, 2025
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 changed the title [Data] Handle prefetches buffering in iter_baches [Data] Handle prefetches buffering in iter_batches Nov 19, 2025
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses an issue with buffering in iter_batches to better align with prefetching, aiming to provide more predictable latency. The changes introduce buffering in _iter_batches and _format_in_threadpool by setting the queue depth in make_async_gen based on the prefetch count. Additionally, the number of workers in the formatting threadpool is now capped to a default of 4 to optimize for latency.

The implementation looks solid and correctly applies the described fixes. I've found a minor but important typo in a newly introduced constant and its corresponding environment variable, which should be corrected before merging. My review includes suggestions to fix this.

srinathk10 and others added 2 commits November 19, 2025 16:59
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@raulchen raulchen merged commit d1d84ba into master Nov 20, 2025
6 checks passed
@raulchen raulchen deleted the srinathk10/format_batches branch November 20, 2025 20:05
400Ping pushed a commit to 400Ping/ray that referenced this pull request Nov 21, 2025
> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Handle prefetches buffering in iter_batches

**Requirements**
- Consumer of `iter_batches` requires predictable latency on batches,
though overall Data Pipeline is optimized for maximizing throughput and
utilization.
- Essentially prefetching is used by the Consumer to impedance match
Latency requirement with the Throughput optimized datapipeline.

**Issue**
- Queuing/buffering was not set up in `_iter_batches` to match
prefetching.
- Multiple Workers were set up for `_format_in_threadpool` as
f(prefetch_count) which adds to latency variance on `_iter_batches`.

**Fix**
- In `_iter_batches`, set up queue depth for buffering in
`make_async_gen` to honor prefetching.
- In `_format_in_threadpool`, restrict to maximum of 4 workers by
default, so as to optimize for `iter_batches` latency.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Handle prefetches buffering in iter_batches

**Requirements**
- Consumer of `iter_batches` requires predictable latency on batches,
though overall Data Pipeline is optimized for maximizing throughput and
utilization.
- Essentially prefetching is used by the Consumer to impedance match
Latency requirement with the Throughput optimized datapipeline.

**Issue**
- Queuing/buffering was not set up in `_iter_batches` to match
prefetching.
- Multiple Workers were set up for `_format_in_threadpool` as
f(prefetch_count) which adds to latency variance on `_iter_batches`.

**Fix**
- In `_iter_batches`, set up queue depth for buffering in
`make_async_gen` to honor prefetching.
- In `_format_in_threadpool`, restrict to maximum of 4 workers by
default, so as to optimize for `iter_batches` latency.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Handle prefetches buffering in iter_batches

**Requirements**
- Consumer of `iter_batches` requires predictable latency on batches,
though overall Data Pipeline is optimized for maximizing throughput and
utilization.
- Essentially prefetching is used by the Consumer to impedance match
Latency requirement with the Throughput optimized datapipeline.

**Issue**
- Queuing/buffering was not set up in `_iter_batches` to match
prefetching.
- Multiple Workers were set up for `_format_in_threadpool` as
f(prefetch_count) which adds to latency variance on `_iter_batches`.

**Fix**
- In `_iter_batches`, set up queue depth for buffering in
`make_async_gen` to honor prefetching.
- In `_format_in_threadpool`, restrict to maximum of 4 workers by
default, so as to optimize for `iter_batches` latency.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

4 participants