Skip to content

Commit

Permalink
appender: fix race condition when logging on shutdown (#1125)
Browse files Browse the repository at this point in the history
## Motivation

Fixes the race condition outlined in #1120 . 

## Solution

`Worker` now uses a 2 stage shutdown approach. The first shutdown signal
is sent through the main message channel to the `Worker` from
`WorkerGuard` when it is dropped. Then `WorkerGuard` sends a second
signal on a second channel that is zero-capacity. This means It will
only succeed a `send()` when a `recv()` is called on the other end. This
guarantees that the `Worker` has flushed all it's messages before the
`WorkerGuard` can continue with its drop.

With this solution I'm not able to reproduce the race anymore using the
provided code sample from #1120 

Co-authored-by: Zeki Sherif <zekshi@amazon.com>
  • Loading branch information
2 people authored and hawkw committed Dec 14, 2020
1 parent ab5a4fe commit 9d03829
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
20 changes: 16 additions & 4 deletions tracing-appender/src/non_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000;
pub struct WorkerGuard {
guard: Option<JoinHandle<()>>,
sender: Sender<Msg>,
shutdown: Sender<()>,
}

/// A non-blocking writer.
Expand Down Expand Up @@ -148,8 +149,11 @@ impl NonBlocking {
) -> (NonBlocking, WorkerGuard) {
let (sender, receiver) = bounded(buffered_lines_limit);

let worker = Worker::new(receiver, writer);
let worker_guard = WorkerGuard::new(worker.worker_thread(), sender.clone());
let (shutdown_sender, shutdown_receiver) = bounded(0);

let worker = Worker::new(receiver, writer, shutdown_receiver);
let worker_guard =
WorkerGuard::new(worker.worker_thread(), sender.clone(), shutdown_sender);

(
Self {
Expand Down Expand Up @@ -245,10 +249,11 @@ impl MakeWriter for NonBlocking {
}

impl WorkerGuard {
fn new(handle: JoinHandle<()>, sender: Sender<Msg>) -> Self {
fn new(handle: JoinHandle<()>, sender: Sender<Msg>, shutdown: Sender<()>) -> Self {
WorkerGuard {
guard: Some(handle),
sender,
shutdown,
}
}
}
Expand All @@ -259,7 +264,14 @@ impl Drop for WorkerGuard {
.sender
.send_timeout(Msg::Shutdown, Duration::from_millis(100))
{
Ok(_) | Err(SendTimeoutError::Disconnected(_)) => (),
Ok(_) => {
// Attempt to wait for `Worker` to flush all messages before dropping. This happens
// when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout`
// so that drop is not blocked indefinitely.
// TODO: Make timeout configurable.
let _ = self.shutdown.send_timeout((), Duration::from_millis(1000));
}
Err(SendTimeoutError::Disconnected(_)) => (),
Err(SendTimeoutError::Timeout(e)) => println!(
"Failed to send shutdown signal to logging worker. Error: {:?}",
e
Expand Down
14 changes: 11 additions & 3 deletions tracing-appender/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{io, thread};
pub(crate) struct Worker<T: Write + Send + Sync + 'static> {
writer: T,
receiver: Receiver<Msg>,
shutdown: Receiver<()>,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand All @@ -18,8 +19,12 @@ pub(crate) enum WorkerState {
}

impl<T: Write + Send + Sync + 'static> Worker<T> {
pub(crate) fn new(receiver: Receiver<Msg>, writer: T) -> Worker<T> {
Self { writer, receiver }
pub(crate) fn new(receiver: Receiver<Msg>, writer: T, shutdown: Receiver<()>) -> Worker<T> {
Self {
writer,
receiver,
shutdown,
}
}

fn handle_recv(&mut self, result: &Result<Msg, RecvError>) -> io::Result<WorkerState> {
Expand Down Expand Up @@ -67,7 +72,10 @@ impl<T: Write + Send + Sync + 'static> Worker<T> {
loop {
match self.work() {
Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {}
Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => break,
Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => {
let _ = self.shutdown.recv();
break;
}
Err(_) => {
// TODO: Expose a metric for IO Errors, or print to stderr
}
Expand Down

0 comments on commit 9d03829

Please sign in to comment.