Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a non blocking file appender #673

Merged
merged 46 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
945f4a8
Introduce a non blocking file appender
Apr 7, 2020
9ccaa5d
Use public interface for Rotation enum
zekisherif Apr 8, 2020
2a34863
simply if statement
zekisherif Apr 8, 2020
39f2709
Make WriterFactory pub(crate)
zekisherif Apr 8, 2020
f251bbe
Make BufWriterFactory pub(crate)
zekisherif Apr 8, 2020
de77617
Make InnerAppender pub(crate)
zekisherif Apr 8, 2020
7c9ce25
Make InnerAppender constructor pub(crate)
zekisherif Apr 8, 2020
e028dfa
make refresh_writer pub(crate)
zekisherif Apr 8, 2020
55bdffe
Make should_rollover pub(crate)
zekisherif Apr 8, 2020
1a70543
Make WorkerState pub(crate)
zekisherif Apr 8, 2020
5b4eacf
Make Worker pub(crate)
zekisherif Apr 8, 2020
18d0340
make constructor pub(crate)
zekisherif Apr 8, 2020
c4a8447
Apply suggested readability improvements and using pub(crate) where a…
zekisherif Apr 8, 2020
1c33af9
Use new Rotation const in match expressions. derive Copy trait for Ro…
Apr 8, 2020
6e8cb23
Remove copy trait from Rotation.
Apr 8, 2020
75ee728
Use Path for log_directory and filename prefix
zekisherif Apr 8, 2020
e5944c0
Properly handle use of Path in non public facing apis
Apr 8, 2020
cc4fde7
Use Arc for error_counter, modify fileappender constructor
Apr 9, 2020
d9968b2
Fix getters for FileAppender, clone writer for getter
Apr 9, 2020
a1f0d01
WIP: Decoupling file appender and nonblocking
Apr 10, 2020
9316656
Apply suggestions from code review
zekisherif Apr 10, 2020
ac14ca3
Update Rotation helpers and add constructor
Apr 10, 2020
b3aadef
Remove generic from InnerAppender, use BufWriter always
Apr 10, 2020
f77a6a8
Move creation of log file to Inner. Get rid of BufWriterFactory
Apr 10, 2020
4417342
Remove multiple impl of InnerAppender
Apr 10, 2020
64b356f
Impl MakeWriter on NonBlocking and remove NonBlockingWriter
Apr 10, 2020
98db6f7
Override write_all
zekisherif Apr 10, 2020
ec049e0
Use T:Write, get rid of RollingFileWriter
Apr 10, 2020
be7aeb4
cargo fmt
Apr 10, 2020
e87b631
Add lossy option for non_blocking
Apr 10, 2020
5d8475c
Add WorkerGuard and ensure closure of worker thread on drop
Apr 10, 2020
2983370
Apply suggestions from code review
zekisherif Apr 13, 2020
8e6a395
address comments
Apr 13, 2020
80bf8e2
Add comment about worker thread yielding
Apr 13, 2020
8f9bdc1
Fix clippy warnings
Apr 13, 2020
e467bdd
Fix 1 more clippy warning
Apr 13, 2020
4b73a1c
Add 2 examples
Apr 14, 2020
2b6cb51
Take suggestions for example doc improvements
Apr 14, 2020
9e7620d
fix clippy warning about needless main
Apr 14, 2020
b415265
Revert "fix clippy warning about needless main"
Apr 14, 2020
4ed792c
Add tracing dev dependency, fix clippy warning
Apr 14, 2020
93b26af
Apply suggestions from code review
zekisherif Apr 14, 2020
f494d53
Supress clippy warning using attribute
Apr 14, 2020
5216e6c
Fix fmt
Apr 14, 2020
97e5e15
Update tracing-appender/src/rolling.rs
zekisherif Apr 14, 2020
714e5bd
fix doc
Apr 14, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ members = [
"tracing-macros",
"tracing-subscriber",
"tracing-serde",
"tracing-appender",
"examples"
]
15 changes: 15 additions & 0 deletions tracing-appender/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "tracing-appender"
version = "0.1.0"
authors = ["Zeki Sherif <zekshi@amazon.com>"]
edition = "2018"
repository = "https://github.com/tokio-rs/tracing"
homepage = "https://tokio.rs"
description = """
Provides an `appender` that writes to a file
"""

[dependencies]
tracing-subscriber = {path = "../tracing-subscriber", version = "0.2.4"}
crossbeam-channel = "0.4.2"
chrono = "0.4.11"
125 changes: 125 additions & 0 deletions tracing-appender/src/file_appender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use crate::inner::BufWriterFactory;
use crate::worker::Worker;
use crate::Rotation;
use chrono::Utc;
use crossbeam_channel::{bounded, Sender};
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::{io, thread};
use tracing_subscriber::fmt::MakeWriter;

#[allow(dead_code)]
zekisherif marked this conversation as resolved.
Show resolved Hide resolved
pub struct FileAppender {
log_writer: FileWriter,
worker_thread: thread::JoinHandle<()>,
error_counter: Arc<AtomicU64>,
}

impl FileAppender {
zekisherif marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(log_directory: &str, log_filename_prefix: &str) -> Self {
FileAppenderBuilder::default()
.build(log_directory, log_filename_prefix)
.expect("Failed to create FileAppender")
}

fn create_appender(
sender: Sender<Vec<u8>>,
worker: Worker<BufWriterFactory>,
error_counter: Arc<AtomicU64>,
) -> Self {
Self {
log_writer: FileWriter {
channel: sender,
error_counter: error_counter.clone(),
},
worker_thread: worker.worker_thread(),
error_counter,
}
}

pub fn builder() -> FileAppenderBuilder {
FileAppenderBuilder::default()
}

pub fn get_writer(self) -> FileWriter {
self.log_writer
}

pub fn get_error_counter(self) -> Arc<AtomicU64> {
self.error_counter.clone()
}
zekisherif marked this conversation as resolved.
Show resolved Hide resolved
}

pub struct FileAppenderBuilder {
buffered_lines_limit: usize,
rotation: Rotation,
}

impl FileAppenderBuilder {
pub fn buffered_lines_limit(mut self, buffered_lines_limit: usize) -> FileAppenderBuilder {
self.buffered_lines_limit = buffered_lines_limit;
self
}

pub fn build(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, it looks like this automatically spawns the worker thread? I wonder if we should also have an API that returns a Worker struct with a public Worker::run method (or something) that runs the worker main loop. That way, the user can spawn the worker thread manually, allowing them to control things like the thread's name and how panics are propagated. We could continue to provide an API that spawns the worker thread as well, since it's probably more ergonomic when the user doesn't need control over spawning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, calling build will result in spawning of the worker thread.

I'll implement your suggestion.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way, the user can spawn the worker thread manually, allowing them to control things like the thread's name and how panics are propagated.

You can do the same by e.g. passing or parameterizing over a ThreadFactory, which is the common thing that is done in Java.

I am not convinced users should by required to manage the lifetime of threads - that just leaks implementation details.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Matthias247 that's a good point, we could have a builder method that takes a std::thread::Builder, but still handles spawning the thread and holding onto the JoinHandle internally.

I'm not sure if there are still valid use-cases for manually running the worker thread, if we can customize the name and stack size. Perhaps some users might want to run the logging work on an existing thread that has already finished some other task (e.g. on the main thread if the rest of the application runs on a thread pool)? But, that seems niche enough that we can always add support for it if anyone actually asks for it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think being able to run the work itself is good if the work is rather short-lived, and you can e.g. schedule it on an existing threadpool. However an appender like this is typically very long lived - you create it once and it runs for the lifetime of the application. That's not something you would schedule on a threadpool. Therefore I agree that other use-case seem niche.

self,
log_directory: impl AsRef<Path>,
log_filename_prefix: impl AsRef<Path>,
) -> io::Result<FileAppender> {
let (sender, receiver) = bounded(self.buffered_lines_limit);

let worker = Worker::new(
receiver,
log_directory.as_ref(),
log_filename_prefix.as_ref(),
self.rotation,
BufWriterFactory {},
Utc::now(),
);
Ok(FileAppender::create_appender(
sender,
worker?,
Arc::new(AtomicU64::new(0)),
))
}
}

impl Default for FileAppenderBuilder {
fn default() -> Self {
FileAppenderBuilder {
buffered_lines_limit: 100_000,
rotation: Rotation::HOURLY,
}
}
}

pub struct FileWriter {
channel: Sender<Vec<u8>>,
error_counter: Arc<AtomicU64>,
}

impl std::io::Write for FileWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let buf_size = buf.len();
if self.channel.try_send(buf.to_vec()).is_err() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allocating a vector for each line will create a lot of memory churn, and I expect it to noticeably show up in profiles if logging is enabled. An alternative solution could be to have a pool of logging buffers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Matthias247 I've started working on a solution for that separately (a ring buffer of strings, basically), and I'm hoping we can swap that in as a replacement for crossbeam_channel in the future. For this PR, I want to focus on the API design.

self.error_counter.fetch_add(1, Ordering::Relaxed);
zekisherif marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(buf_size)
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

impl MakeWriter for FileWriter {
type Writer = FileWriter;

fn make_writer(&self) -> Self::Writer {
FileWriter {
channel: self.channel.clone(),
error_counter: self.error_counter.clone(),
}
}
}
130 changes: 130 additions & 0 deletions tracing-appender/src/inner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use std::io::BufWriter;
use std::io::Write;
use std::{fs, io};

use crate::Rotation;
use chrono::prelude::*;
use std::fmt::Debug;
use std::fs::File;
use std::fs::OpenOptions;
use std::path::Path;

pub(crate) trait WriterFactory: Debug + Send {
type W: Write + Debug + Send;

fn create_writer(&self, directory: &str, filename: &str) -> io::Result<Self::W>;
}

#[derive(Debug)]
pub(crate) struct BufWriterFactory {}

impl WriterFactory for BufWriterFactory {
type W = BufWriter<File>;

fn create_writer(&self, directory: &str, filename: &str) -> io::Result<BufWriter<File>> {
let filepath = Path::new(directory).join(filename);
Ok(BufWriter::new(open_file_create_parent_dirs(&filepath)?))
}
}

#[derive(Debug)]
pub(crate) struct InnerAppender<F: WriterFactory + Send> {
zekisherif marked this conversation as resolved.
Show resolved Hide resolved
log_directory: String,
log_filename_prefix: String,
writer: F::W,
writer_factory: F,
next_date: DateTime<Utc>,
rotation: Rotation,
}

impl<F: WriterFactory> InnerAppender<F> {
fn write_with_ts(&mut self, buf: &[u8], date: DateTime<Utc>) -> io::Result<usize> {
// Even if refresh_writer fails, we still have the original writer. Ignore errors
// and proceed with the write.
let _ = self.refresh_writer(date);
self.writer.write(buf)
}
}

impl<F: WriterFactory> io::Write for InnerAppender<F> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let now = Utc::now();
self.write_with_ts(buf, now)
}

fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
}
}

impl<F: WriterFactory> InnerAppender<F> {
pub(crate) fn new(
log_directory: &Path,
log_filename_prefix: &Path,
rotation: Rotation,
writer_factory: F,
now: DateTime<Utc>,
) -> io::Result<Self> {
let log_directory = log_directory.to_str().unwrap();
let log_filename_prefix = log_filename_prefix.to_str().unwrap();

let filename = rotation.join_date(log_filename_prefix, &now);
let next_date = rotation.next_date(&now);

let mut appender = InnerAppender {
log_directory: log_directory.to_string(),
log_filename_prefix: log_filename_prefix.to_string(),
writer: writer_factory.create_writer(log_directory, &filename)?,
writer_factory,
next_date,
rotation,
};

appender
.write_with_ts(b"Init Application\n", now)
zekisherif marked this conversation as resolved.
Show resolved Hide resolved
.and_then(|_| appender.flush().and(Ok(appender)))
}
}

impl<F: WriterFactory> InnerAppender<F> {
pub(crate) fn refresh_writer(&mut self, now: DateTime<Utc>) -> io::Result<()> {
if self.should_rollover(now) {
let filename = self.rotation.join_date(&self.log_filename_prefix, &now);

self.next_date = self.rotation.next_date(&now);

match self
.writer_factory
.create_writer(&self.log_directory, &filename)
{
Ok(writer) => {
self.writer = writer;
Ok(())
}
Err(err) => {
eprintln!("Couldn't create writer for logs: {}", err);
Err(err)
}
}
} else {
Ok(())
}
}
zekisherif marked this conversation as resolved.
Show resolved Hide resolved

pub(crate) fn should_rollover(&self, date: DateTime<Utc>) -> bool {
date >= self.next_date
}
}

// Open a file - if it throws any error, try creating the parent directory and then the file.
zekisherif marked this conversation as resolved.
Show resolved Hide resolved
fn open_file_create_parent_dirs(path: &Path) -> io::Result<File> {
let new_file = OpenOptions::new().append(true).create(true).open(path);
if new_file.is_err() {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
return OpenOptions::new().append(true).create(true).open(path);
zekisherif marked this conversation as resolved.
Show resolved Hide resolved
}
}

new_file
}
55 changes: 55 additions & 0 deletions tracing-appender/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use chrono::{DateTime, Datelike, TimeZone, Timelike, Utc};

pub mod file_appender;

mod inner;
mod worker;

#[derive(Clone, Eq, PartialEq, Debug)]
pub struct Rotation(RotationKind);

#[derive(Clone, Eq, PartialEq, Debug)]
enum RotationKind {
Hourly,
Daily,
Never,
}

impl Rotation {
pub const HOURLY: Self = Self(RotationKind::Hourly);
pub const DAILY: Self = Self(RotationKind::Daily);
pub const NEVER: Self = Self(RotationKind::Never);

fn next_date(&self, current_date: &DateTime<Utc>) -> DateTime<Utc> {
let unrounded_next_date = match *self {
Rotation::HOURLY => *current_date + chrono::Duration::hours(1),
Rotation::DAILY => *current_date + chrono::Duration::days(1),
Rotation::NEVER => Utc.ymd(9999, 1, 1).and_hms(1, 0, 0),
};
self.round_date(&unrounded_next_date)
}

fn round_date(&self, date: &DateTime<Utc>) -> DateTime<Utc> {
match *self {
Rotation::HOURLY => {
Utc.ymd(date.year(), date.month(), date.day())
.and_hms(date.hour(), 0, 0)
}
Rotation::DAILY => Utc
.ymd(date.year(), date.month(), date.day())
.and_hms(0, 0, 0),
Rotation::NEVER => {
Utc.ymd(date.year(), date.month(), date.day())
.and_hms(date.hour(), 0, 0)
}
}
}

fn join_date(&self, filename: &str, date: &DateTime<Utc>) -> String {
match *self {
Rotation::HOURLY => format!("{}.{}", filename, date.format("%F-%H")),
Rotation::DAILY => format!("{}.{}", filename, date.format("%F")),
Rotation::NEVER => filename.to_string(),
}
}
}
Loading