Skip to content

Commit da83f82

Browse files
authored
perf(runtime): Use all available parallelism (#1858)
1 parent e1ae0f1 commit da83f82

File tree

3 files changed

+50
-32
lines changed

3 files changed

+50
-32
lines changed

launch/dynamo-run/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ fn main() -> anyhow::Result<()> {
6262

6363
// max_worker_threads and max_blocking_threads from env vars or config file.
6464
let rt_config = dynamo_runtime::RuntimeConfig::from_settings()?;
65+
tracing::debug!("Runtime config: {rt_config}");
6566

66-
// One per process. Wraps a Runtime with holds two tokio runtimes.
67+
// One per process. Wraps a Runtime with holds one or two tokio runtimes.
6768
let worker = dynamo_runtime::Worker::from_config(rt_config)?;
6869

6970
worker.execute(wrapper)

lib/runtime/src/config.rs

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,5 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
3-
//
4-
// Licensed under the Apache License, Version 2.0 (the "License");
5-
// you may not use this file except in compliance with the License.
6-
// You may obtain a copy of the License at
7-
//
8-
// http://www.apache.org/licenses/LICENSE-2.0
9-
//
10-
// Unless required by applicable law or agreed to in writing, software
11-
// distributed under the License is distributed on an "AS IS" BASIS,
12-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
// See the License for the specific language governing permissions and
14-
// limitations under the License.
153

164
use super::Result;
175
use derive_builder::Builder;
@@ -20,6 +8,7 @@ use figment::{
208
Figment,
219
};
2210
use serde::{Deserialize, Serialize};
11+
use std::fmt;
2312
use validator::Validate;
2413

2514
/// Default HTTP server host
@@ -66,35 +55,58 @@ impl Default for WorkerConfig {
6655
pub struct RuntimeConfig {
6756
/// Number of async worker threads
6857
/// If set to 1, the runtime will run in single-threaded mode
58+
/// Set this at runtime with environment variable DYN_RUNTIME_NUM_WORKER_THREADS. Defaults to
59+
/// number of cores.
6960
#[validate(range(min = 1))]
70-
#[builder(default = "16")]
7161
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
72-
pub num_worker_threads: usize,
62+
pub num_worker_threads: Option<usize>,
7363

7464
/// Maximum number of blocking threads
7565
/// Blocking threads are used for blocking operations, this value must be greater than 0.
66+
/// Set this at runtime with environment variable DYN_RUNTIME_MAX_BLOCKING_THREADS. Defaults to
67+
/// 512.
7668
#[validate(range(min = 1))]
7769
#[builder(default = "512")]
7870
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
7971
pub max_blocking_threads: usize,
8072

8173
/// HTTP server host for health and metrics endpoints
74+
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_SERVER_HOST
8275
#[builder(default = "DEFAULT_HTTP_SERVER_HOST.to_string()")]
8376
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
8477
pub http_server_host: String,
8578

8679
/// HTTP server port for health and metrics endpoints
8780
/// If set to 0, the system will assign a random available port
81+
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_SERVER_PORT
8882
#[builder(default = "DEFAULT_HTTP_SERVER_PORT")]
8983
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
9084
pub http_server_port: u16,
9185

9286
/// Health and metrics HTTP server enabled
87+
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_ENABLED
9388
#[builder(default = "false")]
9489
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
9590
pub http_enabled: bool,
9691
}
9792

93+
impl fmt::Display for RuntimeConfig {
94+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95+
// If None, it defaults to "number of cores", so we indicate that.
96+
match self.num_worker_threads {
97+
Some(val) => write!(f, "num_worker_threads={val}, ")?,
98+
None => write!(f, "num_worker_threads=default (num_cores), ")?,
99+
}
100+
101+
write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
102+
write!(f, "http_server_host={}, ", self.http_server_host)?;
103+
write!(f, "http_server_port={}, ", self.http_server_port)?;
104+
write!(f, "http_enabled={}", self.http_enabled)?;
105+
106+
Ok(())
107+
}
108+
}
109+
98110
impl RuntimeConfig {
99111
pub fn builder() -> RuntimeConfigBuilder {
100112
RuntimeConfigBuilder::default()
@@ -138,7 +150,7 @@ impl RuntimeConfig {
138150

139151
pub fn single_threaded() -> Self {
140152
RuntimeConfig {
141-
num_worker_threads: 1,
153+
num_worker_threads: Some(1),
142154
max_blocking_threads: 1,
143155
http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(),
144156
http_server_port: DEFAULT_HTTP_SERVER_PORT,
@@ -147,20 +159,24 @@ impl RuntimeConfig {
147159
}
148160

149161
/// Create a new default runtime configuration
150-
pub(crate) fn create_runtime(&self) -> Result<tokio::runtime::Runtime> {
151-
Ok(tokio::runtime::Builder::new_multi_thread()
152-
.worker_threads(self.num_worker_threads)
162+
pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
163+
tokio::runtime::Builder::new_multi_thread()
164+
.worker_threads(
165+
self.num_worker_threads
166+
.unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
167+
)
153168
.max_blocking_threads(self.max_blocking_threads)
154169
.enable_all()
155-
.build()?)
170+
.build()
156171
}
157172
}
158173

159174
impl Default for RuntimeConfig {
160175
fn default() -> Self {
176+
let num_cores = std::thread::available_parallelism().unwrap().get();
161177
Self {
162-
num_worker_threads: 16,
163-
max_blocking_threads: 16,
178+
num_worker_threads: Some(num_cores),
179+
max_blocking_threads: num_cores,
164180
http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(),
165181
http_server_port: DEFAULT_HTTP_SERVER_PORT,
166182
http_enabled: false,
@@ -240,7 +256,7 @@ mod tests {
240256
],
241257
|| {
242258
let config = RuntimeConfig::from_settings()?;
243-
assert_eq!(config.num_worker_threads, 24);
259+
assert_eq!(config.num_worker_threads, Some(24));
244260
assert_eq!(config.max_blocking_threads, 32);
245261
Ok(())
246262
},

lib/runtime/src/runtime.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ impl Runtime {
4747
let secondary = match secondary {
4848
Some(secondary) => secondary,
4949
None => {
50+
tracing::debug!("Created secondary runtime with single thread");
5051
RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
5152
}
5253
};
@@ -60,26 +61,26 @@ impl Runtime {
6061
}
6162

6263
pub fn from_current() -> Result<Runtime> {
63-
let handle = tokio::runtime::Handle::current();
64-
let primary = RuntimeType::External(handle.clone());
65-
let secondary = RuntimeType::External(handle);
66-
Runtime::new(primary, Some(secondary))
64+
Runtime::from_handle(tokio::runtime::Handle::current())
6765
}
6866

6967
pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
70-
let runtime = RuntimeType::External(handle);
71-
Runtime::new(runtime, None)
68+
let primary = RuntimeType::External(handle.clone());
69+
let secondary = RuntimeType::External(handle);
70+
Runtime::new(primary, Some(secondary))
7271
}
7372

7473
/// Create a [`Runtime`] instance from the settings
7574
/// See [`config::RuntimeConfig::from_settings`]
7675
pub fn from_settings() -> Result<Runtime> {
7776
let config = config::RuntimeConfig::from_settings()?;
78-
let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
79-
Runtime::new(owned, None)
77+
let runtime = Arc::new(config.create_runtime()?);
78+
let primary = RuntimeType::Shared(runtime.clone());
79+
let secondary = RuntimeType::External(runtime.handle().clone());
80+
Runtime::new(primary, Some(secondary))
8081
}
8182

82-
/// Create a [`Runtime`] with a single-threaded primary async tokio runtime
83+
/// Create a [`Runtime`] with two single-threaded async tokio runtime
8384
pub fn single_threaded() -> Result<Runtime> {
8485
let config = config::RuntimeConfig::single_threaded();
8586
let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));

0 commit comments

Comments
 (0)