Skip to content

Commit ba422cc

Browse files
committed
perf(runtime): Use all available parallelism
Performance work showed a drop-off in performance scaling when concurrency exceeds 16. Which is the number of threads we were giving tokio. Now we use all the cores on the machine which is the tokio default (and also what Go does). Also ensure in most cases we only use a single tokio runtime. I still contend the secondary runtime is an unnecessary complication, but this way we preserve optionality (in case we choose to pin the secondary runtime to a single core), but also simplify the performance characteristics.
1 parent e1ae0f1 commit ba422cc

File tree

3 files changed

+50
-32
lines changed

3 files changed

+50
-32
lines changed

launch/dynamo-run/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ fn main() -> anyhow::Result<()> {
6262

6363
// max_worker_threads and max_blocking_threads from env vars or config file.
6464
let rt_config = dynamo_runtime::RuntimeConfig::from_settings()?;
65+
tracing::debug!("Runtime config: {rt_config}");
6566

66-
// One per process. Wraps a Runtime with holds two tokio runtimes.
67+
// One per process. Wraps a Runtime with holds one or two tokio runtimes.
6768
let worker = dynamo_runtime::Worker::from_config(rt_config)?;
6869

6970
worker.execute(wrapper)

lib/runtime/src/config.rs

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,5 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
3-
//
4-
// Licensed under the Apache License, Version 2.0 (the "License");
5-
// you may not use this file except in compliance with the License.
6-
// You may obtain a copy of the License at
7-
//
8-
// http://www.apache.org/licenses/LICENSE-2.0
9-
//
10-
// Unless required by applicable law or agreed to in writing, software
11-
// distributed under the License is distributed on an "AS IS" BASIS,
12-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
// See the License for the specific language governing permissions and
14-
// limitations under the License.
153

164
use super::Result;
175
use derive_builder::Builder;
@@ -20,6 +8,7 @@ use figment::{
208
Figment,
219
};
2210
use serde::{Deserialize, Serialize};
11+
use std::fmt;
2312
use validator::Validate;
2413

2514
/// Default HTTP server host
@@ -66,35 +55,58 @@ impl Default for WorkerConfig {
6655
pub struct RuntimeConfig {
6756
/// Number of async worker threads
6857
/// If set to 1, the runtime will run in single-threaded mode
58+
/// Set this at runtime with environment variable DYN_RUNTIME_NUM_WORKER_THREADS. Defaults to
59+
/// number of cores.
6960
#[validate(range(min = 1))]
70-
#[builder(default = "16")]
7161
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
72-
pub num_worker_threads: usize,
62+
pub num_worker_threads: Option<usize>,
7363

7464
/// Maximum number of blocking threads
7565
/// Blocking threads are used for blocking operations, this value must be greater than 0.
66+
/// Set this at runtime with environment variable DYN_RUNTIME_MAX_BLOCKING_THREADS. Defaults to
67+
/// 512.
7668
#[validate(range(min = 1))]
7769
#[builder(default = "512")]
7870
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
7971
pub max_blocking_threads: usize,
8072

8173
/// HTTP server host for health and metrics endpoints
74+
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_SERVER_HOST
8275
#[builder(default = "DEFAULT_HTTP_SERVER_HOST.to_string()")]
8376
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
8477
pub http_server_host: String,
8578

8679
/// HTTP server port for health and metrics endpoints
8780
/// If set to 0, the system will assign a random available port
81+
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_SERVER_PORT
8882
#[builder(default = "DEFAULT_HTTP_SERVER_PORT")]
8983
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
9084
pub http_server_port: u16,
9185

9286
/// Health and metrics HTTP server enabled
87+
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_ENABLED
9388
#[builder(default = "false")]
9489
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
9590
pub http_enabled: bool,
9691
}
9792

93+
impl fmt::Display for RuntimeConfig {
94+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95+
// If None, it defaults to "number of cores", so we indicate that.
96+
match self.num_worker_threads {
97+
Some(val) => write!(f, "num_worker_threads={val}, ")?,
98+
None => write!(f, "num_worker_threads=default (num_cores), ")?,
99+
}
100+
101+
write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
102+
write!(f, "http_server_host={}, ", self.http_server_host)?;
103+
write!(f, "http_server_port={}, ", self.http_server_port)?;
104+
write!(f, "http_enabled={}", self.http_enabled)?;
105+
106+
Ok(())
107+
}
108+
}
109+
98110
impl RuntimeConfig {
99111
pub fn builder() -> RuntimeConfigBuilder {
100112
RuntimeConfigBuilder::default()
@@ -138,7 +150,7 @@ impl RuntimeConfig {
138150

139151
pub fn single_threaded() -> Self {
140152
RuntimeConfig {
141-
num_worker_threads: 1,
153+
num_worker_threads: Some(1),
142154
max_blocking_threads: 1,
143155
http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(),
144156
http_server_port: DEFAULT_HTTP_SERVER_PORT,
@@ -147,20 +159,24 @@ impl RuntimeConfig {
147159
}
148160

149161
/// Create a new default runtime configuration
150-
pub(crate) fn create_runtime(&self) -> Result<tokio::runtime::Runtime> {
151-
Ok(tokio::runtime::Builder::new_multi_thread()
152-
.worker_threads(self.num_worker_threads)
162+
pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
163+
tokio::runtime::Builder::new_multi_thread()
164+
.worker_threads(
165+
self.num_worker_threads
166+
.unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
167+
)
153168
.max_blocking_threads(self.max_blocking_threads)
154169
.enable_all()
155-
.build()?)
170+
.build()
156171
}
157172
}
158173

159174
impl Default for RuntimeConfig {
160175
fn default() -> Self {
176+
let num_cores = std::thread::available_parallelism().unwrap().get();
161177
Self {
162-
num_worker_threads: 16,
163-
max_blocking_threads: 16,
178+
num_worker_threads: Some(num_cores),
179+
max_blocking_threads: num_cores,
164180
http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(),
165181
http_server_port: DEFAULT_HTTP_SERVER_PORT,
166182
http_enabled: false,
@@ -240,7 +256,7 @@ mod tests {
240256
],
241257
|| {
242258
let config = RuntimeConfig::from_settings()?;
243-
assert_eq!(config.num_worker_threads, 24);
259+
assert_eq!(config.num_worker_threads, Some(24));
244260
assert_eq!(config.max_blocking_threads, 32);
245261
Ok(())
246262
},

lib/runtime/src/runtime.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ impl Runtime {
4747
let secondary = match secondary {
4848
Some(secondary) => secondary,
4949
None => {
50+
tracing::debug!("Created secondary runtime with single thread");
5051
RuntimeType::Shared(Arc::new(RuntimeConfig::single_threaded().create_runtime()?))
5152
}
5253
};
@@ -60,26 +61,26 @@ impl Runtime {
6061
}
6162

6263
pub fn from_current() -> Result<Runtime> {
63-
let handle = tokio::runtime::Handle::current();
64-
let primary = RuntimeType::External(handle.clone());
65-
let secondary = RuntimeType::External(handle);
66-
Runtime::new(primary, Some(secondary))
64+
Runtime::from_handle(tokio::runtime::Handle::current())
6765
}
6866

6967
pub fn from_handle(handle: tokio::runtime::Handle) -> Result<Runtime> {
70-
let runtime = RuntimeType::External(handle);
71-
Runtime::new(runtime, None)
68+
let primary = RuntimeType::External(handle.clone());
69+
let secondary = RuntimeType::External(handle);
70+
Runtime::new(primary, Some(secondary))
7271
}
7372

7473
/// Create a [`Runtime`] instance from the settings
7574
/// See [`config::RuntimeConfig::from_settings`]
7675
pub fn from_settings() -> Result<Runtime> {
7776
let config = config::RuntimeConfig::from_settings()?;
78-
let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));
79-
Runtime::new(owned, None)
77+
let runtime = Arc::new(config.create_runtime()?);
78+
let primary = RuntimeType::Shared(runtime.clone());
79+
let secondary = RuntimeType::External(runtime.handle().clone());
80+
Runtime::new(primary, Some(secondary))
8081
}
8182

82-
/// Create a [`Runtime`] with a single-threaded primary async tokio runtime
83+
/// Create a [`Runtime`] with two single-threaded async tokio runtime
8384
pub fn single_threaded() -> Result<Runtime> {
8485
let config = config::RuntimeConfig::single_threaded();
8586
let owned = RuntimeType::Shared(Arc::new(config.create_runtime()?));

0 commit comments

Comments
 (0)