diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0d92757..44f77c4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,7 +49,7 @@ jobs: matrix: # When updating this, the reminder to update the minimum supported # Rust version in Cargo.toml. - rust: ['1.48'] + rust: ['1.56'] steps: - uses: actions/checkout@v3 - name: Install Rust diff --git a/Cargo.toml b/Cargo.toml index b139e4e..00e20bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "blocking" version = "1.3.1" authors = ["Stjepan Glavina "] edition = "2018" -rust-version = "1.48" +rust-version = "1.56" description = "A thread pool for isolating blocking I/O in async programs" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/blocking" @@ -21,8 +21,8 @@ async-task = "4.0.2" fastrand = "1.3.4" futures-io = { version = "0.3.28", default-features = false, features = ["std"] } futures-lite = { version = "1.11.0", default-features = false } -log = "0.4.17" piper = "0.2.0" +tracing = { version = "0.1.37", default-features = false } [dev-dependencies] futures-lite = "1.11.0" diff --git a/src/lib.rs b/src/lib.rs index 07de679..bb9dba6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -185,6 +185,9 @@ impl Executor { /// /// This function runs blocking tasks until it becomes idle and times out. fn main_loop(&'static self) { + let span = tracing::trace_span!("blocking::main_loop"); + let _enter = span.enter(); + let mut inner = self.inner.lock().unwrap(); loop { // This thread is not idle anymore because it's going to run tasks. @@ -207,6 +210,7 @@ impl Executor { // Put the thread to sleep until another task is scheduled. let timeout = Duration::from_millis(500); + tracing::trace!(?timeout, "going to sleep"); let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap(); inner = lock; @@ -216,7 +220,11 @@ impl Executor { inner.thread_count -= 1; break; } + + tracing::trace!("notified"); } + + tracing::trace!("shutting down due to lack of tasks"); } /// Schedules a runnable task for execution. @@ -231,11 +239,21 @@ impl Executor { /// Spawns more blocking threads if the pool is overloaded with work. fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) { + let span = tracing::error_span!( + "grow_pool", + queue_len = inner.queue.len(), + idle_count = inner.idle_count, + thread_count = inner.thread_count, + ); + let _enter = span.enter(); + // If runnable tasks greatly outnumber idle threads and there aren't too many threads // already, then be aggressive: wake all idle threads and spawn one more thread. while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < inner.thread_limit.get() { + tracing::trace!("spawning a new thread to handle blocking tasks"); + // The new thread starts in idle state. inner.idle_count += 1; inner.thread_count += 1; @@ -253,7 +271,7 @@ impl Executor { .spawn(move || self.main_loop()) { // We were unable to spawn the thread, so we need to undo the state changes. - log::error!("Failed to spawn a blocking thread: {}", e); + tracing::error!("failed to spawn a blocking thread: {}", e); inner.idle_count -= 1; inner.thread_count -= 1; @@ -264,7 +282,12 @@ impl Executor { // If the limit is about to be set to zero, set it to one instead so that if, // in the future, we are able to spawn more threads, we will be able to do so. - NonZeroUsize::new(new_limit).unwrap_or_else(|| NonZeroUsize::new(1).unwrap()) + NonZeroUsize::new(new_limit).unwrap_or_else(|| { + tracing::warn!( + "attempted to lower thread_limit to zero; setting to one instead" + ); + NonZeroUsize::new(1).unwrap() + }) }; } }