Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a non blocking file appender #673

Merged
merged 46 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
945f4a8
Introduce a non blocking file appender
Apr 7, 2020
9ccaa5d
Use public interface for Rotation enum
zekisherif Apr 8, 2020
2a34863
simply if statement
zekisherif Apr 8, 2020
39f2709
Make WriterFactory pub(crate)
zekisherif Apr 8, 2020
f251bbe
Make BufWriterFactory pub(crate)
zekisherif Apr 8, 2020
de77617
Make InnerAppender pub(crate)
zekisherif Apr 8, 2020
7c9ce25
Make InnerAppender constructor pub(crate)
zekisherif Apr 8, 2020
e028dfa
make refresh_writer pub(crate)
zekisherif Apr 8, 2020
55bdffe
Make should_rollover pub(crate)
zekisherif Apr 8, 2020
1a70543
Make WorkerState pub(crate)
zekisherif Apr 8, 2020
5b4eacf
Make Worker pub(crate)
zekisherif Apr 8, 2020
18d0340
make constructor pub(crate)
zekisherif Apr 8, 2020
c4a8447
Apply suggested readability improvements and using pub(crate) where a…
zekisherif Apr 8, 2020
1c33af9
Use new Rotation const in match expressions. derive Copy trait for Ro…
Apr 8, 2020
6e8cb23
Remove copy trait from Rotation.
Apr 8, 2020
75ee728
Use Path for log_directory and filename prefix
zekisherif Apr 8, 2020
e5944c0
Properly handle use of Path in non public facing apis
Apr 8, 2020
cc4fde7
Use Arc for error_counter, modify fileappender constructor
Apr 9, 2020
d9968b2
Fix getters for FileAppender, clone writer for getter
Apr 9, 2020
a1f0d01
WIP: Decoupling file appender and nonblocking
Apr 10, 2020
9316656
Apply suggestions from code review
zekisherif Apr 10, 2020
ac14ca3
Update Rotation helpers and add constructor
Apr 10, 2020
b3aadef
Remove generic from InnerAppender, use BufWriter always
Apr 10, 2020
f77a6a8
Move creation of log file to Inner. Get rid of BufWriterFactory
Apr 10, 2020
4417342
Remove multiple impl of InnerAppender
Apr 10, 2020
64b356f
Impl MakeWriter on NonBlocking and remove NonBlockingWriter
Apr 10, 2020
98db6f7
Override write_all
zekisherif Apr 10, 2020
ec049e0
Use T:Write, get rid of RollingFileWriter
Apr 10, 2020
be7aeb4
cargo fmt
Apr 10, 2020
e87b631
Add lossy option for non_blocking
Apr 10, 2020
5d8475c
Add WorkerGuard and ensure closure of worker thread on drop
Apr 10, 2020
2983370
Apply suggestions from code review
zekisherif Apr 13, 2020
8e6a395
address comments
Apr 13, 2020
80bf8e2
Add comment about worker thread yielding
Apr 13, 2020
8f9bdc1
Fix clippy warnings
Apr 13, 2020
e467bdd
Fix 1 more clippy warning
Apr 13, 2020
4b73a1c
Add 2 examples
Apr 14, 2020
2b6cb51
Take suggestions for example doc improvements
Apr 14, 2020
9e7620d
fix clippy warning about needless main
Apr 14, 2020
b415265
Revert "fix clippy warning about needless main"
Apr 14, 2020
4ed792c
Add tracing dev dependency, fix clippy warning
Apr 14, 2020
93b26af
Apply suggestions from code review
zekisherif Apr 14, 2020
f494d53
Supress clippy warning using attribute
Apr 14, 2020
5216e6c
Fix fmt
Apr 14, 2020
97e5e15
Update tracing-appender/src/rolling.rs
zekisherif Apr 14, 2020
714e5bd
fix doc
Apr 14, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ members = [
"tracing-macros",
"tracing-subscriber",
"tracing-serde",
"tracing-appender",
"examples"
]
15 changes: 15 additions & 0 deletions tracing-appender/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "tracing-appender"
version = "0.1.0"
authors = ["Zeki Sherif <zekshi@amazon.com>"]
edition = "2018"
repository = "https://github.com/tokio-rs/tracing"
homepage = "https://tokio.rs"
description = """
Provides an `appender` that writes to a file
"""

[dependencies]
tracing-subscriber = {path = "../tracing-subscriber", version = "0.2.4"}
crossbeam-channel = "0.4.2"
chrono = "0.4.11"
96 changes: 96 additions & 0 deletions tracing-appender/src/inner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::io::{BufWriter, Write};
use std::{fs, io};

use crate::rolling::Rotation;
use chrono::prelude::*;
use std::fmt::Debug;
use std::fs::{File, OpenOptions};
use std::path::Path;

#[derive(Debug)]
pub(crate) struct InnerAppender {
log_directory: String,
log_filename_prefix: String,
writer: BufWriter<File>,
next_date: DateTime<Utc>,
rotation: Rotation,
}

impl io::Write for InnerAppender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let now = Utc::now();
self.write_timestamped(buf, now)
}

fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
}
}

impl InnerAppender {
pub(crate) fn new(
log_directory: &Path,
log_filename_prefix: &Path,
rotation: Rotation,
now: DateTime<Utc>,
) -> io::Result<Self> {
let log_directory = log_directory.to_str().unwrap();
let log_filename_prefix = log_filename_prefix.to_str().unwrap();

let filename = rotation.join_date(log_filename_prefix, &now);
let next_date = rotation.next_date(&now);

Ok(InnerAppender {
log_directory: log_directory.to_string(),
log_filename_prefix: log_filename_prefix.to_string(),
writer: create_writer(log_directory, &filename)?,
next_date,
rotation,
})
}

fn write_timestamped(&mut self, buf: &[u8], date: DateTime<Utc>) -> io::Result<usize> {
// Even if refresh_writer fails, we still have the original writer. Ignore errors
// and proceed with the write.
let buf_len = buf.len();
self.refresh_writer(date);
self.writer.write_all(buf).map(|_| buf_len)
}

fn refresh_writer(&mut self, now: DateTime<Utc>) {
if self.should_rollover(now) {
let filename = self.rotation.join_date(&self.log_filename_prefix, &now);

self.next_date = self.rotation.next_date(&now);

match create_writer(&self.log_directory, &filename) {
Ok(writer) => self.writer = writer,
Err(err) => eprintln!("Couldn't create writer for logs: {}", err),
}
}
}

fn should_rollover(&self, date: DateTime<Utc>) -> bool {
date >= self.next_date
}
}

fn create_writer(directory: &str, filename: &str) -> io::Result<BufWriter<File>> {
let file_path = Path::new(directory).join(filename);
Ok(BufWriter::new(open_file_create_parent_dirs(&file_path)?))
}

fn open_file_create_parent_dirs(path: &Path) -> io::Result<File> {
let mut open_options = OpenOptions::new();
open_options.append(true).create(true);

let new_file = open_options.open(path);
if new_file.is_err() {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
return open_options.open(path);
}
}

new_file
}
28 changes: 28 additions & 0 deletions tracing-appender/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::non_blocking::{NonBlocking, WorkerGuard};

use std::io::Write;

mod inner;
pub mod non_blocking;
pub mod rolling;
mod worker;

/// Creates a non-blocking, off-thread writer.
zekisherif marked this conversation as resolved.
Show resolved Hide resolved
///
/// Note that the `WorkerGuard` returned by `non_blocking` _must_ be assigned to a binding that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably also explain what WorkerGuard is , as well as noting that it shouldn't be dropped accidentally :)

and, it's fine for users to drop it if they don't care about ensuring that logs are flushed on panics (but they probably do care about this)

/// is not `_`, as `_` will result in the `WorkerGuard` being dropped immediately.
///
/// # Examples
/// ``` rust
///
/// # fn docs() {
/// let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
/// let subscriber = tracing_subscriber::fmt().with_writer(non_blocking);
/// tracing::subscriber::with_default(subscriber.finish(), || {
/// tracing::event!(tracing::Level::INFO, "Hello");
/// });
/// # }
/// ```
pub fn non_blocking<T: Write + Send + Sync + 'static>(writer: T) -> (NonBlocking, WorkerGuard) {
NonBlocking::new(writer)
}
146 changes: 146 additions & 0 deletions tracing-appender/src/non_blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use crate::worker::Worker;
use crossbeam_channel::{bounded, Sender};
use std::io;
use std::io::Write;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::Arc;
use std::thread::JoinHandle;
use tracing_subscriber::fmt::MakeWriter;

pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000;

#[derive(Debug)]
pub struct WorkerGuard {
guard: Option<JoinHandle<()>>,
shutdown_signal: Arc<AtomicBool>,
}

#[derive(Clone, Debug)]
pub struct NonBlocking {
error_counter: Arc<AtomicU64>,
channel: Sender<Vec<u8>>,
is_lossy: bool,
}

impl NonBlocking {
pub fn new<T: Write + Send + Sync + 'static>(writer: T) -> (NonBlocking, WorkerGuard) {
NonBlockingBuilder::default().finish(writer)
}

fn create<T: Write + Send + Sync + 'static>(
writer: T,
buffered_lines_limit: usize,
is_lossy: bool,
) -> (NonBlocking, WorkerGuard) {
let (sender, receiver) = bounded(buffered_lines_limit);
let shutdown_signal = Arc::new(AtomicBool::new(false));

let worker = Worker::new(receiver, writer, shutdown_signal.clone());
let worker_guard = WorkerGuard::new(worker.worker_thread(), shutdown_signal);

(
Self {
channel: sender,
error_counter: Arc::new(AtomicU64::new(0)),
is_lossy,
},
worker_guard,
)
}

pub fn error_counter(&self) -> Arc<AtomicU64> {
self.error_counter.clone()
}
}

#[derive(Debug)]
pub struct NonBlockingBuilder {
zekisherif marked this conversation as resolved.
Show resolved Hide resolved
buffered_lines_limit: usize,
is_lossy: bool,
}

impl NonBlockingBuilder {
pub fn buffered_lines_limit(mut self, buffered_lines_limit: usize) -> NonBlockingBuilder {
zekisherif marked this conversation as resolved.
Show resolved Hide resolved
self.buffered_lines_limit = buffered_lines_limit;
self
}

pub fn lossy(mut self, is_lossy: bool) -> NonBlockingBuilder {
self.is_lossy = is_lossy;
self
}

pub fn finish<T: Write + Send + Sync + 'static>(self, writer: T) -> (NonBlocking, WorkerGuard) {
NonBlocking::create(writer, self.buffered_lines_limit, self.is_lossy)
}
}

impl Default for NonBlockingBuilder {
fn default() -> Self {
NonBlockingBuilder {
buffered_lines_limit: DEFAULT_BUFFERED_LINES_LIMIT,
is_lossy: true,
}
}
}

impl std::io::Write for NonBlocking {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let buf_size = buf.len();
if self.is_lossy {
if self.channel.try_send(buf.to_vec()).is_err() {
self.error_counter.fetch_add(1, Ordering::Relaxed);
}
} else {
return match self.channel.send(buf.to_vec()) {
Ok(_) => Ok(buf_size),
Err(_) => Err(io::Error::from(io::ErrorKind::Other)),
};
}
Ok(buf_size)
}

zekisherif marked this conversation as resolved.
Show resolved Hide resolved
fn flush(&mut self) -> io::Result<()> {
Ok(())
}

#[inline]
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.write(buf).map(|_| ())
}
}

impl MakeWriter for NonBlocking {
type Writer = NonBlocking;

fn make_writer(&self) -> Self::Writer {
self.clone()
}
}

impl WorkerGuard {
fn new(handle: JoinHandle<()>, shutdown_signal: Arc<AtomicBool>) -> Self {
WorkerGuard {
guard: Some(handle),
shutdown_signal,
}
}

fn stop(&mut self) -> std::thread::Result<()> {
match self.guard.take() {
Some(handle) => handle.join(),
None => Ok(()),
}
}
}

impl Drop for WorkerGuard {
fn drop(&mut self) {
self.shutdown_signal.store(true, Ordering::Relaxed);
match self.stop() {
Ok(_) => (),
Err(e) => println!("Failed to join worker thread. Error: {:?}", e),
}
}
}
Loading