Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions quickwit/quickwit-common/src/cpus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZero;

use tracing::{error, info, warn};

const QW_NUM_CPUS_ENV_KEY: &str = "QW_NUM_CPUS";
const KUBERNETES_LIMITS_CPU: &str = "KUBERNETES_LIMITS_CPU";

/// Return the number of vCPU/hyperthreads available.
/// The following methods are used in order:
/// - from the `QW_NUM_CPUS` environment variable
/// - from the `KUBERNETES_LIMITS_CPU` environment variable
/// - from the operating system
/// - default to 2.
pub fn num_cpus() -> usize {
let num_cpus_from_os_opt = std::thread::available_parallelism()
.map(NonZero::get)
.inspect_err(|err| {
error!(error=?err, "failed to detect the number of threads available: arbitrarily returning 2");
})
.ok();
let num_cpus_from_env_opt = get_num_cpus_from_env(QW_NUM_CPUS_ENV_KEY);
let num_cpus_from_k8s_limit = get_num_cpus_from_env(KUBERNETES_LIMITS_CPU);

if let Some(num_cpus) = num_cpus_from_env_opt {
return num_cpus;
}

if let Some(num_cpus_from_k8s_limit) = num_cpus_from_k8s_limit {
info!(
"num cpus from k8s limit: {}, possibly overriding os value {:?}",
num_cpus_from_k8s_limit, num_cpus_from_env_opt
);
return num_cpus_from_k8s_limit;
}

if let Some(num_cpus_from_os_opt) = num_cpus_from_os_opt {
info!("num cpus from os: {}", num_cpus_from_os_opt);
return num_cpus_from_os_opt;
}

warn!("failed to detect number of cpus. defaulting to 2");
2
}

fn parse_cpu_to_mcpu(cpu_string: &str) -> Result<usize, &'static str> {
let trimmed_str = cpu_string.trim();

if trimmed_str.is_empty() {
return Err("input cpu_string cannot be empty");
}

if let Some(val_str) = trimmed_str.strip_suffix('m') {
// The value is already in millicores.
val_str
.parse::<usize>()
.map_err(|_| "invalid millicore value")
} else {
// The value is in CPU cores.
let value = trimmed_str
.parse::<f64>()
.map_err(|_| "invalid float value")?;
Ok((value * 1000.0f64) as usize)
}
}

// Get the number of CPUs from an environment variable.
// The value is expected to be in k8s format (200m means 200 millicores, 2 means 2 cores)
//
// We then get the number of vCPUs by ceiling any non integer value.
fn get_num_cpus_from_env(env_key: &str) -> Option<usize> {
let k8s_cpu_limit_str: String = crate::get_from_env_opt(env_key)?;
let mcpus = parse_cpu_to_mcpu(&k8s_cpu_limit_str)
.inspect_err(|err_msg| {
warn!(
"failed to parse k8s cpu limit (`{}`): {}",
k8s_cpu_limit_str, err_msg
);
})
.ok()?;
let num_vcpus = mcpus.div_ceil(1000);
Some(num_vcpus)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_millicores() {
assert_eq!(parse_cpu_to_mcpu("500m").unwrap(), 500);
assert_eq!(parse_cpu_to_mcpu("100m").unwrap(), 100);
assert_eq!(parse_cpu_to_mcpu("2500m").unwrap(), 2500);
}

#[test]
fn test_cores() {
assert_eq!(parse_cpu_to_mcpu("1").unwrap(), 1000);
assert_eq!(parse_cpu_to_mcpu("2").unwrap(), 2000);
}

#[test]
fn test_fractional_cores() {
assert_eq!(parse_cpu_to_mcpu("0.5").unwrap(), 500);
assert_eq!(parse_cpu_to_mcpu("1.5").unwrap(), 1500);
assert_eq!(parse_cpu_to_mcpu("0.25").unwrap(), 250);
}

#[test]
fn test_with_whitespace() {
assert_eq!(parse_cpu_to_mcpu(" 750m ").unwrap(), 750);
assert_eq!(parse_cpu_to_mcpu(" 0.75 ").unwrap(), 750);
}

#[test]
fn test_invalid_input() {
assert!(parse_cpu_to_mcpu("").is_err());
assert!(parse_cpu_to_mcpu(" ").is_err());
assert!(parse_cpu_to_mcpu("abc").is_err());
assert!(parse_cpu_to_mcpu("1a").is_err());
assert!(parse_cpu_to_mcpu("m500").is_err());
assert!(parse_cpu_to_mcpu("500m1").is_err());
}
}
14 changes: 2 additions & 12 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod coolid;
#[cfg(feature = "jemalloc-profiled")]
pub(crate) mod alloc_tracker;
pub mod binary_heap;
mod cpus;
pub mod fs;
pub mod io;
#[cfg(feature = "jemalloc-profiled")]
Expand Down Expand Up @@ -56,6 +57,7 @@ use std::ops::{Range, RangeInclusive};
use std::str::FromStr;

pub use coolid::new_coolid;
pub use cpus::num_cpus;
pub use kill_switch::KillSwitch;
pub use path_hasher::PathHasher;
pub use progress::{Progress, ProtectedZoneGuard};
Expand Down Expand Up @@ -199,18 +201,6 @@ pub const fn div_ceil(lhs: i64, rhs: i64) -> i64 {
}
}

/// Return the number of vCPU/hyperthreads available.
/// This number is usually not equal to the number of cpu cores
pub fn num_cpus() -> usize {
match std::thread::available_parallelism() {
Ok(num_cpus) => num_cpus.get(),
Err(io_error) => {
error!(error=?io_error, "failed to detect the number of threads available: arbitrarily returning 2");
2
}
}
}

// The following are helpers to build named tasks.
//
// Named tasks require the tokio feature `tracing` to be enabled. If the
Expand Down