Skip to content

Commit

Permalink
appender: fix races in tracing-appender unit tests (tokio-rs#737)
Browse files Browse the repository at this point in the history
## Motivation

I was able to reproduce the failures for `logs_dropped_if_lossy` on my
machine when using loom.

Realized that the Worker calls `recv` and the only time we actually will
increment the error_counter is when we have a write blocking on the call
to `write_all` (called by the worker) and a messege buffered in
NonBlocking crossbeam sender.

## Solution

I added some sleeps after writes to avoid some races I saw.

For `multi_threaded_writes`, what likely is happening is that sometimes
the last thread hasn't had a chance write to the queue, hence we now use
`recv_timeout` instead of `try_recv`, to ensure  there's more than enough
time for the message to be visible in the channel.

Co-authored-by: Zeki Sherif <zekshi@amazon.com>
  • Loading branch information
2 people authored and theworldsbestcoder committed Jun 12, 2020
1 parent 5f6e35b commit 168a521
Showing 1 changed file with 27 additions and 13 deletions.
40 changes: 27 additions & 13 deletions tracing-appender/src/non_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ mod test {
assert_eq!(line, ", World");
}

fn write_non_blocking(non_blocking: &mut NonBlocking, msg: &[u8]) {
non_blocking.write_all(msg).expect("Failed to write");

// Sleep a bit to prevent races.
thread::sleep(Duration::from_millis(200));
}

#[test]
fn logs_dropped_if_lossy() {
let (mock_writer, rx) = MockWriter::new(1);
Expand All @@ -344,27 +351,34 @@ mod test {

let error_count = non_blocking.error_counter();

non_blocking.write_all(b"Hello").expect("Failed to write");
// First write will not block
write_non_blocking(&mut non_blocking, b"Hello");
assert_eq!(0, error_count.load(Ordering::Acquire));

non_blocking.write_all(b", World").expect("Failed to write");
assert_eq!(1, error_count.load(Ordering::Acquire));
// Second write will not block as Worker will have called `recv` on channel.
// "Hello" is not yet consumed. MockWriter call to write_all will block until
// "Hello" is consumed.
write_non_blocking(&mut non_blocking, b", World");
assert_eq!(0, error_count.load(Ordering::Acquire));

non_blocking.write_all(b".").expect("Failed to write");
assert_eq!(2, error_count.load(Ordering::Acquire));
// Will sit in NonBlocking channel's buffer.
write_non_blocking(&mut non_blocking, b"Test");
assert_eq!(0, error_count.load(Ordering::Acquire));

// Allow a line to be written
// Allow a line to be written. "Hello" message will be consumed.
// ", World" will be able to write to MockWriter.
// "Test" will block on call to MockWriter's `write_all`
let line = rx.recv().unwrap();
assert_eq!(line, "Hello");

// Now, there is once again capacity in the buffer.
non_blocking
.write_all(b"Universe")
.expect("Failed to write");
assert_eq!(2, error_count.load(Ordering::Acquire));
// This will block as NonBlocking channel is full.
write_non_blocking(&mut non_blocking, b"Universe");
assert_eq!(1, error_count.load(Ordering::Acquire));

// Finally the second message sent will be consumed.
let line = rx.recv().unwrap();
assert_eq!(line, "Universe");
assert_eq!(line, ", World");
assert_eq!(1, error_count.load(Ordering::Acquire));
}

#[test]
Expand Down Expand Up @@ -394,7 +408,7 @@ mod test {

let mut hello_count: u8 = 0;

while let Ok(event_str) = rx.try_recv() {
while let Ok(event_str) = rx.recv_timeout(Duration::from_secs(5)) {
assert!(event_str.contains("Hello"));
hello_count += 1;
}
Expand Down

0 comments on commit 168a521

Please sign in to comment.