From a6ae3e359fb9778cff80f106f1a6a67de687a633 Mon Sep 17 00:00:00 2001 From: Aleksandr Trukhin Date: Wed, 10 Nov 2021 13:23:06 +0300 Subject: [PATCH 1/6] appender: fix WorkerGuard not waiting for writer destruction --- tracing-appender/src/non_blocking.rs | 15 ++++++++++++--- tracing-appender/src/worker.rs | 5 +---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/tracing-appender/src/non_blocking.rs b/tracing-appender/src/non_blocking.rs index 65129e8fc6..7524e99a0a 100644 --- a/tracing-appender/src/non_blocking.rs +++ b/tracing-appender/src/non_blocking.rs @@ -103,7 +103,7 @@ pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000; #[must_use] #[derive(Debug)] pub struct WorkerGuard { - _guard: Option>, + handle: Option>, sender: Sender, shutdown: Sender<()>, } @@ -259,7 +259,7 @@ impl<'a> MakeWriter<'a> for NonBlocking { impl WorkerGuard { fn new(handle: JoinHandle<()>, sender: Sender, shutdown: Sender<()>) -> Self { WorkerGuard { - _guard: Some(handle), + handle: Some(handle), sender, shutdown, } @@ -277,7 +277,16 @@ impl Drop for WorkerGuard { // when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout` // so that drop is not blocked indefinitely. // TODO: Make timeout configurable. - let _ = self.shutdown.send_timeout((), Duration::from_millis(1000)); + match self.shutdown.send_timeout((), Duration::from_millis(1000)) { + Err(SendTimeoutError::Disconnected(_)) => (), + Err(SendTimeoutError::Timeout(e)) => { + println!("Failed to wait for logging worker shutdown. Error: {:?}", e); + } + Ok(_) => { + // At this point it is safe to wait for `Worker` destruction without blocking + self.handle.take().map(std::thread::JoinHandle::join); + } + } } Err(SendTimeoutError::Disconnected(_)) => (), Err(SendTimeoutError::Timeout(e)) => println!( diff --git a/tracing-appender/src/worker.rs b/tracing-appender/src/worker.rs index 5508baca85..33cc6051f0 100644 --- a/tracing-appender/src/worker.rs +++ b/tracing-appender/src/worker.rs @@ -74,16 +74,13 @@ impl Worker { Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {} Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => { let _ = self.shutdown.recv(); - break; + return; } Err(_) => { // TODO: Expose a metric for IO Errors, or print to stderr } } } - if let Err(e) = self.writer.flush() { - eprintln!("Failed to flush. Error: {}", e); - } }) } } From ac4249701bef2f86d3431e87f4e66a4b42f9848e Mon Sep 17 00:00:00 2001 From: trtt Date: Fri, 12 Nov 2021 15:00:41 +0300 Subject: [PATCH 2/6] refactor(suggestion): better error Co-authored-by: Eliza Weisman --- tracing-appender/src/non_blocking.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tracing-appender/src/non_blocking.rs b/tracing-appender/src/non_blocking.rs index 7524e99a0a..8b092590ab 100644 --- a/tracing-appender/src/non_blocking.rs +++ b/tracing-appender/src/non_blocking.rs @@ -277,10 +277,11 @@ impl Drop for WorkerGuard { // when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout` // so that drop is not blocked indefinitely. // TODO: Make timeout configurable. - match self.shutdown.send_timeout((), Duration::from_millis(1000)) { + let timeout = Duration::from_millis(1000); + match self.shutdown.send_timeout((), timeout) { Err(SendTimeoutError::Disconnected(_)) => (), - Err(SendTimeoutError::Timeout(e)) => { - println!("Failed to wait for logging worker shutdown. Error: {:?}", e); + Err(SendTimeoutError::Timeout(_)) => { + eprintln!("Shutting down logging worker timed out after {:?}.", timeout); } Ok(_) => { // At this point it is safe to wait for `Worker` destruction without blocking From fb1f941483cfab46cec8e5652957119e5c561cce Mon Sep 17 00:00:00 2001 From: Aleksandr Trukhin Date: Fri, 12 Nov 2021 15:03:36 +0300 Subject: [PATCH 3/6] refactor: another error message --- tracing-appender/src/non_blocking.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tracing-appender/src/non_blocking.rs b/tracing-appender/src/non_blocking.rs index 8b092590ab..f640e55d7e 100644 --- a/tracing-appender/src/non_blocking.rs +++ b/tracing-appender/src/non_blocking.rs @@ -268,10 +268,8 @@ impl WorkerGuard { impl Drop for WorkerGuard { fn drop(&mut self) { - match self - .sender - .send_timeout(Msg::Shutdown, Duration::from_millis(100)) - { + let timeout = Duration::from_millis(100); + match self.sender.send_timeout(Msg::Shutdown, timeout) { Ok(_) => { // Attempt to wait for `Worker` to flush all messages before dropping. This happens // when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout` @@ -281,7 +279,10 @@ impl Drop for WorkerGuard { match self.shutdown.send_timeout((), timeout) { Err(SendTimeoutError::Disconnected(_)) => (), Err(SendTimeoutError::Timeout(_)) => { - eprintln!("Shutting down logging worker timed out after {:?}.", timeout); + eprintln!( + "Shutting down logging worker timed out after {:?}.", + timeout + ); } Ok(_) => { // At this point it is safe to wait for `Worker` destruction without blocking @@ -290,9 +291,9 @@ impl Drop for WorkerGuard { } } Err(SendTimeoutError::Disconnected(_)) => (), - Err(SendTimeoutError::Timeout(e)) => println!( - "Failed to send shutdown signal to logging worker. Error: {:?}", - e + Err(SendTimeoutError::Timeout(_)) => eprintln!( + "Sending shutdown signal to logging worker timed out after {:?}", + timeout ), } } From 1e1741f93e45ff8344709d4d296f9debbb6873b3 Mon Sep 17 00:00:00 2001 From: Aleksandr Trukhin Date: Fri, 12 Nov 2021 15:09:12 +0300 Subject: [PATCH 4/6] join worker thread in disconnected case also --- tracing-appender/src/non_blocking.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tracing-appender/src/non_blocking.rs b/tracing-appender/src/non_blocking.rs index f640e55d7e..0c964d5e93 100644 --- a/tracing-appender/src/non_blocking.rs +++ b/tracing-appender/src/non_blocking.rs @@ -277,14 +277,13 @@ impl Drop for WorkerGuard { // TODO: Make timeout configurable. let timeout = Duration::from_millis(1000); match self.shutdown.send_timeout((), timeout) { - Err(SendTimeoutError::Disconnected(_)) => (), Err(SendTimeoutError::Timeout(_)) => { eprintln!( "Shutting down logging worker timed out after {:?}.", timeout ); } - Ok(_) => { + _ => { // At this point it is safe to wait for `Worker` destruction without blocking self.handle.take().map(std::thread::JoinHandle::join); } From 13fec129c5666fe6cf0fe30053c4034a0ea72cc2 Mon Sep 17 00:00:00 2001 From: Aleksandr Trukhin Date: Sat, 13 Nov 2021 19:52:37 +0300 Subject: [PATCH 5/6] report error on failed join --- tracing-appender/src/non_blocking.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tracing-appender/src/non_blocking.rs b/tracing-appender/src/non_blocking.rs index 0c964d5e93..f620608914 100644 --- a/tracing-appender/src/non_blocking.rs +++ b/tracing-appender/src/non_blocking.rs @@ -285,7 +285,11 @@ impl Drop for WorkerGuard { } _ => { // At this point it is safe to wait for `Worker` destruction without blocking - self.handle.take().map(std::thread::JoinHandle::join); + if let Some(handle) = self.handle.take() { + if handle.join().is_err() { + eprintln!("Logging worker thread panicked"); + } + }; } } } From 45da5122a92b8820f674c3962ff67605d40b510f Mon Sep 17 00:00:00 2001 From: Aleksandr Trukhin Date: Sat, 13 Nov 2021 20:29:00 +0300 Subject: [PATCH 6/6] drop writer before joining worker thread --- tracing-appender/src/worker.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tracing-appender/src/worker.rs b/tracing-appender/src/worker.rs index 33cc6051f0..c675e6b47b 100644 --- a/tracing-appender/src/worker.rs +++ b/tracing-appender/src/worker.rs @@ -73,6 +73,7 @@ impl Worker { match self.work() { Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {} Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => { + drop(self.writer); // drop now in case it blocks let _ = self.shutdown.recv(); return; }