-
Notifications
You must be signed in to change notification settings - Fork 693
feat: add Rayon compute pool for CPU-intensive operations #2969
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Integrate Rayon with Tokio to provide a dedicated thread pool for CPU-bound work. Multiple async tasks can concurrently use different parallelization patterns (scope, par_iter, join) on the shared pool. - Add ComputePool with tokio-rayon for async-to-sync bridging - Support concurrent access from multiple async tasks - Configure via DYN_COMPUTE_* environment variables - Include comprehensive Rayon/Tokio strategy documentation - Add examples for tokenization and mixed workloads
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
WalkthroughAdds a Rayon-backed compute pool integrated with Tokio (tokio-rayon), runtime wiring and config, metrics collection, public compute APIs and helpers, documentation, examples, and two new Cargo dependencies. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Dev as Tokio Task
participant RT as Runtime
participant CP as ComputePool
participant BR as tokio_rayon
participant RP as Rayon ThreadPool
Dev->>RT: runtime.compute_pool()
alt Pool available
RT-->>Dev: &ComputePool
Dev->>CP: execute / execute_scoped / join / install(f)
CP->>CP: metrics.record_task_start()
CP->>BR: tokio_rayon::spawn onto Rayon pool
BR->>RP: run closure(s)
RP-->>BR: result
BR-->>CP: result
CP->>CP: metrics.record_task_completion()
CP-->>Dev: Result<R>
else No pool
RT-->>Dev: None
Note over Dev: Caller may fall back (spawn_blocking)
end
sequenceDiagram
autonumber
actor Dev as Tokio Task
participant CP as ComputePool
participant RP as Rayon ThreadPool
rect rgb(235,245,255)
note right of CP: parallel_join
Dev->>CP: join(f1, f2)
CP->>RP: run f1 || f2
RP-->>CP: (r1, r2)
CP-->>Dev: (r1, r2)
end
rect rgb(245,235,255)
note right of CP: parallel_map
Dev->>CP: parallel_map(items, f)
CP->>RP: items.into_par_iter().map(f).collect()
RP-->>CP: Vec<R>
CP-->>Dev: Vec<R>
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Pre-merge checks (2 passed, 1 warning)❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 12
🧹 Nitpick comments (13)
lib/runtime/docs/rayon-tokio-strategy.md (5)
26-28: Align diagram with API surface (use ComputePool labels, not tokio_rayon::spawn).Doc shows
tokio_rayon::spawnwhile examples useComputePool. Rename for consistency.-| tokio_rayon::spawn +| ComputePool (install/scope/join)
197-205: Unify error handling in examples (unwrap vs?).Mixed
unwrap()and?in similar contexts. Prefer?in docs to model best practices.- }).await.unwrap(); + }).await?;Also applies to: 341-343
49-49: Fix MD026: remove trailing punctuation in headings.Drop the trailing colon for markdownlint conformance.
-### Use Tokio (async/await) when: +### Use Tokio (async/await) when -### Use Rayon (compute pool) when: +### Use Rayon (compute pool) whenAlso applies to: 56-56
63-68: Clarify thresholds relative to the compute pool.State that
spawn_blockingis a fallback when the compute pool is disabled/unavailable; otherwise prefer the pool for >1ms CPU work.-- Use `spawn_blocking` when CPU work takes **>1ms** +- If the compute pool is disabled/unavailable, use `spawn_blocking` when CPU work takes **>1ms**; otherwise use the compute pool.
335-343: Add missing imports in Rust snippets for completeness.Examples use Rayon parallel iterators; add
use rayon::prelude::*;.// ✅ Good: Collect async items, then process in parallel +use rayon::prelude::*;Also applies to: 384-391
lib/runtime/Cargo.toml (1)
66-66: Consider minimizing features for faster builds.If you don’t need Rayon’s default features (e.g.,
stdis required, others may not), consider explicit features to trim compile time.-rayon = { version = "1.10" } +rayon = { version = "1.10", default-features = true }(Keep default-features=true if you rely on them; otherwise tighten explicitly.)
lib/runtime/src/lib.rs (1)
37-37: Publicly exposingcomputeis a crate-level API addition; confirm stability intent.If the surface is experimental, gate behind a feature or add
#[doc(cfg(feature = "compute"))]to set expectations.- pub mod compute; + #[cfg(feature = "compute")] + pub mod compute;(And add a
computefeature in Cargo.toml.)lib/runtime/src/runtime.rs (1)
59-62: Initialize compute pool only via config—OK. Consider parity for other constructors.
from_settingswires the pool, butfrom_current,from_handle, andsingle_threadednever do, leading to surprisingNone.- pub fn from_current() -> Result<Runtime> { + pub fn from_current() -> Result<Runtime> { Runtime::from_handle(tokio::runtime::Handle::current()) } @@ - pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> { + pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> { let primary = RuntimeType::External(handle.clone()); let secondary = RuntimeType::External(handle); - Runtime::new(primary, Some(secondary)) + // Load config and attempt to attach compute pool for parity with from_settings + let cfg = crate::config::RuntimeConfig::from_settings().unwrap_or_else(|_| crate::config::RuntimeConfig::default()); + Runtime::new_with_config(primary, Some(secondary), &cfg) } @@ - pub fn single_threaded() -> Result<Runtime> { + pub fn single_threaded() -> Result<Runtime> { let config = config::RuntimeConfig::single_threaded(); - let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?)); - Runtime::new(owned, None) + let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?)); + Runtime::new_with_config(owned, None, &config) }Also applies to: 70-71
lib/runtime/examples/tokenizer_integration.rs (1)
201-201: Avoid setting environment variables in examples.Setting environment variables at runtime can cause issues if multiple examples or tests run concurrently, and it may not reflect the actual deployment configuration pattern.
Consider removing this line and documenting the environment variable in a comment instead:
- std::env::set_var("DYN_COMPUTE_THREADS", "4"); + // To configure the compute pool thread count, set DYN_COMPUTE_THREADS=4 + // before running this examplelib/runtime/src/compute/metrics.rs (1)
84-92: Add overflow protection for average computation.While unlikely in practice, the division could theoretically produce infinity or NaN if the total time approaches u64::MAX. Consider saturating arithmetic or checked operations.
pub fn avg_task_duration_us(&self) -> f64 { let total = self.tasks_total.load(Ordering::Relaxed); if total == 0 { return 0.0; } let total_time = self.total_compute_time_us.load(Ordering::Relaxed); - total_time as f64 / total as f64 + (total_time as f64) / (total as f64) }Also consider adding a comment about the theoretical overflow scenario for future maintainers.
lib/runtime/src/compute/mod.rs (2)
62-67: Consider making the default thread count calculation more robust.The current calculation
n.get() / 2might result in suboptimal performance on systems with few cores. Consider using a minimum of 2 threads to ensure some parallelism even on dual-core systems.let num_threads = self.num_threads.unwrap_or_else(|| { std::thread::available_parallelism() .map(|n| n.get() / 2) .unwrap_or(2) - .max(1) + .max(2) // Ensure at least 2 threads for parallelism });
83-88: Consider implementing CPU pinning or documenting why it's deferred.The TODO comment indicates CPU pinning is planned but not implemented. This could impact performance in NUMA systems or when strict CPU isolation is needed.
Would you like me to help implement CPU pinning using platform-specific APIs (e.g.,
libc::sched_setaffinityon Linux)? I can open an issue to track this enhancement.lib/runtime/src/compute/pool.rs (1)
251-273: Document or remove unused ComputeHandle
ComputeHandle is only referenced viapub use pool::{ComputePool, ComputeHandle}in lib/runtime/src/compute/mod.rs and has no internal usages—either remove it to eliminate dead code or document its intended public-API role.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
lib/runtime/Cargo.toml(1 hunks)lib/runtime/docs/rayon-tokio-strategy.md(1 hunks)lib/runtime/examples/compute_pool_example.rs(1 hunks)lib/runtime/examples/tokenizer_integration.rs(1 hunks)lib/runtime/src/compute/metrics.rs(1 hunks)lib/runtime/src/compute/mod.rs(1 hunks)lib/runtime/src/compute/pool.rs(1 hunks)lib/runtime/src/config.rs(4 hunks)lib/runtime/src/lib.rs(2 hunks)lib/runtime/src/runtime.rs(3 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.17.2)
lib/runtime/docs/rayon-tokio-strategy.md
49-49: Trailing punctuation in heading
Punctuation: ':'
(MD026, no-trailing-punctuation)
56-56: Trailing punctuation in heading
Punctuation: ':'
(MD026, no-trailing-punctuation)
🔇 Additional comments (8)
lib/runtime/Cargo.toml (1)
66-66: Verify Rayon and Tokio compatibility
Ensuretokio-rayon = "2.1"aligns with your Tokio 1.x version and thatrayon = "1.10"doesn’t introduce duplicaterayon-corein the workspace. Manually confirm these versions and your dependency tree inlib/runtime/Cargo.toml.lib/runtime/src/lib.rs (1)
87-88: Optional: consider hidingComputePoolbehind an accessor only.Keeping the field private is good. If you plan to evolve internals, ensure callers only use
Runtime::compute_pool().lib/runtime/src/runtime.rs (2)
78-83: ValidateComputeConfigfields vs docs (pinning, prefix).Docs mention env-driven DYN_COMPUTE_*; verify
pin_threadsis supported/configured or remove to avoid dead params.
156-162: Accessor looks good; returningOption<&Arc<_>>avoids cloning.No change requested.
lib/runtime/src/config.rs (2)
128-146: LGTM! Well-documented configuration fields for the compute pool.The new compute pool configuration fields are properly integrated with sensible defaults (2MB stack size) and clear documentation. The environment variable mapping follows the established pattern.
217-233: LGTM! Consistent environment variable handling.The DYN_COMPUTE_* environment variable processing follows the same pattern as DYN_SYSTEM_* and properly filters out empty values.
lib/runtime/src/compute/metrics.rs (1)
53-65: LGTM! Correct implementation of lock-free max update.The compare-exchange loop properly handles concurrent updates to the maximum task duration using a weak CAS operation, which is the standard pattern for lock-free atomic max updates.
lib/runtime/src/compute/pool.rs (1)
238-246: Good implementation of pool-specific installation.The
installmethod correctly ensures that parallel iterators run on this specific pool rather than the global Rayon pool. This is essential for proper pool isolation.
- Use custom Rayon pool instead of global pool in tokio_rayon::spawn - Fix parallel_map to use into_par_iter() for actual parallelism - Handle compute_threads=0 as disabled with info logging - Fix test_compute_pool_scoped to use channels for result collection - Mark documentation examples as no_run to ensure compilation
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
lib/runtime/src/runtime.rs (1)
74-111: Consider handlingcompute_threads == 0more gracefully.The current implementation treats
compute_threads == Some(0)as disabled but still attempts to create a pool whenNone. Consider making the disable check more robust.Apply this diff to improve the logic:
- // Check if compute pool is explicitly disabled - if config.compute_threads == Some(0) { - tracing::info!("Compute pool disabled (compute_threads = 0)"); - } else { - match crate::compute::ComputePool::new(compute_config) { - Ok(pool) => { - rt.compute_pool = Some(Arc::new(pool)); - tracing::debug!( - "Initialized compute pool with {} threads", - rt.compute_pool.as_ref().unwrap().num_threads() - ); - } - Err(e) => { - tracing::warn!( - "Failed to create compute pool: {}. CPU-intensive operations will use spawn_blocking", - e - ); - } - } - } + // Check if compute pool is explicitly disabled or should be created + match config.compute_threads { + Some(0) => { + tracing::info!("Compute pool disabled (compute_threads = 0)"); + } + _ => { + match crate::compute::ComputePool::new(compute_config) { + Ok(pool) => { + let num_threads = pool.num_threads(); + rt.compute_pool = Some(Arc::new(pool)); + tracing::debug!("Initialized compute pool with {} threads", num_threads); + } + Err(e) => { + tracing::warn!( + "Failed to create compute pool: {}. CPU-intensive operations will use spawn_blocking", + e + ); + } + } + } + }This refactor:
- Avoids the
unwrap()call on Line 98 by getting the thread count before wrapping in Arc- Makes the disabled vs enabled logic clearer with pattern matching
- Maintains the same functionality while being more idiomatic
🧹 Nitpick comments (1)
lib/runtime/src/compute/mod.rs (1)
56-94: Consider implementing CPU pinning when requested.The TODO comment on Lines 83-88 indicates that CPU pinning is not yet implemented. This could be important for performance-critical applications that need deterministic thread-to-core mapping.
Would you like me to help implement CPU pinning using platform-specific APIs (e.g.,
libc::sched_setaffinityon Linux)? I can open an issue to track this enhancement.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
lib/runtime/docs/rayon-tokio-strategy.md(1 hunks)lib/runtime/examples/compute_pool_example.rs(1 hunks)lib/runtime/examples/tokenizer_integration.rs(1 hunks)lib/runtime/src/compute/metrics.rs(1 hunks)lib/runtime/src/compute/mod.rs(1 hunks)lib/runtime/src/compute/pool.rs(1 hunks)lib/runtime/src/runtime.rs(3 hunks)
✅ Files skipped from review due to trivial changes (1)
- lib/runtime/docs/rayon-tokio-strategy.md
🚧 Files skipped from review as they are similar to previous changes (2)
- lib/runtime/examples/tokenizer_integration.rs
- lib/runtime/src/compute/metrics.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-16T12:41:12.543Z
Learnt from: grahamking
PR: ai-dynamo/dynamo#1962
File: lib/runtime/src/component/client.rs:270-273
Timestamp: 2025-07-16T12:41:12.543Z
Learning: In lib/runtime/src/component/client.rs, the current mutex usage in get_or_create_dynamic_instance_source is temporary while evaluating whether the mutex can be dropped entirely. The code currently has a race condition between try_lock and lock().await, but this is acknowledged as an interim state during the performance optimization process.
Applied to files:
lib/runtime/examples/compute_pool_example.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Build and Test - vllm
- GitHub Check: Build and Test - sglang
- GitHub Check: Build and Test - dynamo
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: pre-merge-rust (.)
🔇 Additional comments (10)
lib/runtime/src/runtime.rs (2)
59-61: LGTM!The initialization of
compute_pooltoNoneis correct and follows proper Rust conventions.
170-175: LGTM!The public API for accessing the compute pool is well-designed with proper documentation explaining when
Noneis returned.lib/runtime/examples/compute_pool_example.rs (3)
71-85: Good implementation with proper synchronization.The code correctly uses
Arc<Mutex<Vec<_>>>to safely share and update the results vector across multiple threads, avoiding data races.
133-152: Good implementation with proper synchronization.The tokenization example correctly uses
Arc<Mutex<Vec<_>>>for thread-safe updates. The implementation properly handles concurrent writes without data races.
171-200: Good implementation of hierarchical computation.The nested scope implementation correctly uses
Arc<Mutex<Vec<_>>>at both levels to ensure thread-safe updates. The hierarchical pattern is well-structured with proper synchronization.lib/runtime/src/compute/mod.rs (1)
125-134: LGTM! Fixed parallel_map implementation.The
parallel_mapfunction now correctly usesinto_par_iter()which provides true parallelism, addressing the previous performance concern. The implementation efficiently leverages Rayon's parallel iterator infrastructure.lib/runtime/src/compute/pool.rs (4)
119-133: LGTM! Correctly uses the configured pool.The
executemethod properly usespool.install(f)withintokio_rayon::spawn, ensuring that the computation runs on the configured Rayon pool rather than the global pool. This addresses the previous concern about pool configuration.
139-160: LGTM! Proper pool installation for scoped execution.The
execute_scopedmethod correctly wraps the scope execution withpool.install(), ensuring all spawned tasks within the scope run on the configured pool.
167-189: LGTM! Consistent pool installation for FIFO scope.The
execute_scoped_fifomethod follows the same pattern asexecute_scoped, properly installing the pool before executing the FIFO scope.
372-403: Good test implementation with proper synchronization.The test correctly uses
mpsc::channel()to collect results from parallel tasks, avoiding the ownership issues from the previous implementation. The approach properly handles concurrent execution and result collection.
- Add criterion benchmark for overhead measurement - Add interactive demo showing crossover points - Add async throughput impact demonstration - Demonstrate overhead at n=10 vs benefits at n=100k+ Signed-off-by: Ryan Olson <rolson@nvidia.com>
- Add compute_small!, compute_medium!, compute_large! macros for size-aware execution - Implement semaphore-controlled block_in_place for medium tasks (100μs-1ms) - Add thread-local storage for compute context on Tokio worker threads - Include optional validation feature for detecting misclassified tasks - Simplify benchmarks to use 3 representative values (10, 1_000, 100_000) - Add block_in_place overhead benchmarking The semaphore is properly tied to Tokio worker thread count to ensure async threads remain available when using block_in_place. Medium tasks use block_in_place when permits are available, falling back to Rayon offload when permits are exhausted. Signed-off-by: Ryan Olson <rolson@nvidia.com>
…teraction benchmark - Add has_compute_context() to check if thread-local compute context is available - Add assert_compute_context() to panic if context not initialized - Create async_vs_compute_interaction.rs benchmark example - Demonstrate interference patterns with 4 total threads in two configurations: - All-Async: 4 Tokio threads (compute blocks async work) - Hybrid: 2 Tokio + 2 Rayon threads (compute offloaded) - Shows how compute workloads affect async I/O latency - Validates that offloading compute >100μs is essential for async responsiveness Signed-off-by: Ryan Olson <rolson@nvidia.com>
- Replace flawed initialize_all_thread_locals that used random task spawning - Implement barrier-based approach ensuring ALL worker threads get initialized - Add detect_worker_thread_count() to accurately count runtime worker threads - Update async_vs_compute_interaction example to demonstrate thread-local macros - Show proper initialization using barrier synchronization - Add dynamic compute function calibration for accurate benchmarks - Demonstrate that thread-local context enables macro usage without explicit pools Signed-off-by: Ryan Olson <rolson@nvidia.com>
Co-authored-by: Olga Andreeva <124622579+oandreeva-nv@users.noreply.github.com> Signed-off-by: Ryan Olson <ryanolson@users.noreply.github.com>
DYN-1083 has been resolved, re-enabling the test that was previously skipped due to Qwen3ForCausalLM model loading failures in vLLM. Signed-off-by: Ryan Olson <rolson@nvidia.com>
Resolves conflicts by rebuilding lockfile with updated dependencies from main branch (anyhow 1.0.100, aws-lc-rs 1.14.0, etc.) and our rayon/tokio-rayon additions. Signed-off-by: Ryan Olson <rolson@nvidia.com>
grahamking
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving unreviewed to unblock. I believe several other people internally have reviewed.
Overview:
Integrate Rayon with Tokio to provide a dedicated thread pool for CPU-bound work. This PR introduces a comprehensive compute pool system with size-aware task classification macros, thread-local context, and robust async/compute interaction handling.
Details:
This implementation adds several key components:
compute_small!,compute_medium!,compute_large!with automatic fallbackKey features:
Where should the reviewer start?
lib/runtime/src/compute/mod.rs- Main configuration and pool setuplib/runtime/src/compute/macros.rs- Smart task classification systemlib/runtime/examples/async_vs_compute_interaction.rs- Demonstrates interference patternslib/runtime/src/compute/thread_local.rs- Context initializationlib/runtime/src/compute/README.md- Usage patterns and strategiesRelated Issues:
Addresses the need for efficient CPU-bound task handling in async Rust applications without blocking the async runtime.