From b439705a8f6f2f81935a7747765935869003f7d3 Mon Sep 17 00:00:00 2001 From: trtt Date: Sat, 13 Nov 2021 23:08:35 +0300 Subject: [PATCH] appender: fix WorkerGuard not waiting for writer destruction (#1713) ## Motivation Can be though of as a continuation to #1120 and #1125. Example with problematic racy behavior: ``` use std::io::Write; struct TestDrop(T); impl Drop for TestDrop { fn drop(&mut self) { println!("Dropped"); } } impl Write for TestDrop { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.0.write(buf) } fn flush(&mut self) -> std::io::Result<()> { self.0.flush() } } fn main() { let writer = TestDrop(std::io::stdout()); let (non_blocking, _guard) = tracing_appender::non_blocking(writer); tracing_subscriber::fmt().with_writer(non_blocking).init(); } ``` Running this test case in a loop with `while ./test | grep Dropped; do done`, it can be seen that sometimes writer (`TestDrop`) is not dropped and the message is not printed. I suppose that proper destruction of non-blocking writer should properly destroy underlying writer. ## Solution Solution involves joining `Worker` thread (that owns writer) after waiting for it to almost finish avoiding potential deadlock (see https://github.com/tokio-rs/tracing/issues/1120#issuecomment-733982304) --- tracing-appender/src/non_blocking.rs | 34 ++++++++++++++++++++-------- tracing-appender/src/worker.rs | 6 ++--- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/tracing-appender/src/non_blocking.rs b/tracing-appender/src/non_blocking.rs index 65129e8fc6..f620608914 100644 --- a/tracing-appender/src/non_blocking.rs +++ b/tracing-appender/src/non_blocking.rs @@ -103,7 +103,7 @@ pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000; #[must_use] #[derive(Debug)] pub struct WorkerGuard { - _guard: Option>, + handle: Option>, sender: Sender, shutdown: Sender<()>, } @@ -259,7 +259,7 @@ impl<'a> MakeWriter<'a> for NonBlocking { impl WorkerGuard { fn new(handle: JoinHandle<()>, sender: Sender, shutdown: Sender<()>) -> Self { WorkerGuard { - _guard: Some(handle), + handle: Some(handle), sender, shutdown, } @@ -268,21 +268,35 @@ impl WorkerGuard { impl Drop for WorkerGuard { fn drop(&mut self) { - match self - .sender - .send_timeout(Msg::Shutdown, Duration::from_millis(100)) - { + let timeout = Duration::from_millis(100); + match self.sender.send_timeout(Msg::Shutdown, timeout) { Ok(_) => { // Attempt to wait for `Worker` to flush all messages before dropping. This happens // when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout` // so that drop is not blocked indefinitely. // TODO: Make timeout configurable. - let _ = self.shutdown.send_timeout((), Duration::from_millis(1000)); + let timeout = Duration::from_millis(1000); + match self.shutdown.send_timeout((), timeout) { + Err(SendTimeoutError::Timeout(_)) => { + eprintln!( + "Shutting down logging worker timed out after {:?}.", + timeout + ); + } + _ => { + // At this point it is safe to wait for `Worker` destruction without blocking + if let Some(handle) = self.handle.take() { + if handle.join().is_err() { + eprintln!("Logging worker thread panicked"); + } + }; + } + } } Err(SendTimeoutError::Disconnected(_)) => (), - Err(SendTimeoutError::Timeout(e)) => println!( - "Failed to send shutdown signal to logging worker. Error: {:?}", - e + Err(SendTimeoutError::Timeout(_)) => eprintln!( + "Sending shutdown signal to logging worker timed out after {:?}", + timeout ), } } diff --git a/tracing-appender/src/worker.rs b/tracing-appender/src/worker.rs index 5508baca85..c675e6b47b 100644 --- a/tracing-appender/src/worker.rs +++ b/tracing-appender/src/worker.rs @@ -73,17 +73,15 @@ impl Worker { match self.work() { Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {} Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => { + drop(self.writer); // drop now in case it blocks let _ = self.shutdown.recv(); - break; + return; } Err(_) => { // TODO: Expose a metric for IO Errors, or print to stderr } } } - if let Err(e) = self.writer.flush() { - eprintln!("Failed to flush. Error: {}", e); - } }) } }