Skip to content

Commit

Permalink
Fix issues with rotations and cleanup (issue #150)
Browse files Browse the repository at this point in the history
- with sub-second rotations
- with cleanup when all logfiles should be compressed
  • Loading branch information
emabee committed Sep 27, 2023
1 parent 2786748 commit e892b8f
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 13 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this
project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.27.1] - 2023-09-20

Fix issues with sub-second rotations and with cleanup when all logfiles should be compressed
(issue #150).

## [0.27.0] - 2023-09-20

Revise, and modify the signature of, `LoggerHande::existing_log_files()` (version bump).
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "flexi_logger"
version = "0.27.0"
version = "0.27.1"
authors = ["emabee <meinolf.block@sap.com>"]
categories = ["development-tools::debugging"]
description = """
Expand Down Expand Up @@ -65,6 +65,8 @@ tracing-subscriber = { version = "0.3", optional = true, features = [
libc = { version = "^0.2.50", optional = true }

[dev-dependencies]
either = "1.9"
flate2 = "1.0"
serde_derive = "1.0"
version-sync = "0.9"
tracing = "0.1.36"
Expand Down
6 changes: 6 additions & 0 deletions src/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ pub enum Naming {
/// File rotation switches over to the next file.
NumbersDirect,
}
impl Naming {
pub(crate) fn writes_direct(self) -> bool {
matches!(self, Naming::NumbersDirect | Naming::TimestampsDirect)
}
}

/// Defines the strategy for handling older log files.
///
/// Is used in [`Logger::rotate`](crate::Logger::rotate).
Expand Down
11 changes: 11 additions & 0 deletions src/writers/file_log_writer/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ enum NamingState {
// contains the index of the current output file
NumbersDirect(u32),
}
impl NamingState {
pub(crate) fn writes_direct(&self) -> bool {
matches!(
self,
NamingState::NumbersDirect(_) | NamingState::TimestampsDirect(_)
)
}
}

#[derive(Debug)]
enum RollState {
Expand Down Expand Up @@ -296,11 +304,13 @@ impl State {
&None,
&rotate_config.cleanup,
&self.config.file_spec,
rotate_config.naming.writes_direct(),
)?;
if *cleanup_in_background_thread {
Some(list_and_cleanup::start_cleanup_thread(
rotate_config.cleanup,
self.config.file_spec.clone(),
rotate_config.naming.writes_direct(),
)?)
} else {
None
Expand Down Expand Up @@ -389,6 +399,7 @@ impl State {
&rotation_state.o_cleanup_thread_handle,
&rotation_state.cleanup,
&self.config.file_spec,
rotation_state.naming_state.writes_direct(),
)?;
}
}
Expand Down
28 changes: 23 additions & 5 deletions src/writers/file_log_writer/state/list_and_cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ pub(super) fn remove_or_compress_too_old_logfiles(
o_cleanup_thread_handle: &Option<CleanupThreadHandle>,
cleanup_config: &Cleanup,
file_spec: &FileSpec,
writes_direct: bool,
) -> Result<(), std::io::Error> {
o_cleanup_thread_handle.as_ref().map_or_else(
|| remove_or_compress_too_old_logfiles_impl(cleanup_config, file_spec),
|| remove_or_compress_too_old_logfiles_impl(cleanup_config, file_spec, writes_direct),
|cleanup_thread_handle| {
cleanup_thread_handle
.sender
Expand All @@ -68,8 +69,9 @@ pub(super) fn remove_or_compress_too_old_logfiles(
pub(crate) fn remove_or_compress_too_old_logfiles_impl(
cleanup_config: &Cleanup,
file_spec: &FileSpec,
writes_direct: bool,
) -> Result<(), std::io::Error> {
let (log_limit, compress_limit) = match *cleanup_config {
let (mut log_limit, compress_limit) = match *cleanup_config {
Cleanup::Never => {
return Ok(());
}
Expand All @@ -84,6 +86,11 @@ pub(crate) fn remove_or_compress_too_old_logfiles_impl(
}
};

// we must not clean up the current output file
if writes_direct && log_limit == 0 {
log_limit = 1;
}

for (index, file) in list_of_log_and_compressed_files(file_spec)
.into_iter()
.enumerate()
Expand All @@ -97,13 +104,23 @@ pub(crate) fn remove_or_compress_too_old_logfiles_impl(
// compress, if not yet compressed
if let Some(extension) = file.extension() {
if extension != "gz" {
let mut old_file = File::open(file.clone())?;
let mut compressed_file = file.clone();
compressed_file.set_extension("log.gz");
match compressed_file.extension() {
Some(oss) => {
let mut oss_gz = oss.to_os_string();
oss_gz.push(".gz");
compressed_file.set_extension(oss_gz.as_os_str());
}
None => {
compressed_file.set_extension("gz");
}
}

let mut gz_encoder = flate2::write::GzEncoder::new(
File::create(compressed_file)?,
flate2::Compression::fast(),
);
let mut old_file = File::open(file.clone())?;
std::io::copy(&mut old_file, &mut gz_encoder)?;
gz_encoder.finish()?;
std::fs::remove_file(&file)?;
Expand Down Expand Up @@ -138,6 +155,7 @@ impl CleanupThreadHandle {
pub(super) fn start_cleanup_thread(
cleanup: Cleanup,
file_spec: FileSpec,
writes_direct: bool,
) -> Result<CleanupThreadHandle, std::io::Error> {
let (sender, receiver) = std::sync::mpsc::channel();
let builder = ThreadBuilder::new().name(CLEANER.to_string());
Expand All @@ -147,7 +165,7 @@ pub(super) fn start_cleanup_thread(
sender,
join_handle: builder.spawn(move || {
while let Ok(MessageToCleanupThread::Act) = receiver.recv() {
remove_or_compress_too_old_logfiles_impl(&cleanup, &file_spec).ok();
remove_or_compress_too_old_logfiles_impl(&cleanup, &file_spec, writes_direct).ok();
}
})?,
})
Expand Down
25 changes: 19 additions & 6 deletions src/writers/file_log_writer/state/timestamps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,19 @@ pub(super) fn collision_free_infix_for_rotated_file(
let infix_date_string = ts_infix_from_timestamp(date_for_rotated_file, use_utc);

let mut new_path = file_spec.as_pathbuf(Some(&infix_date_string));
// Search for rotated_path as is and for restart-siblings;
// if any exists, find highest restart and add 1, else continue without restart
let mut new_path_with_gz = new_path.clone();
match new_path.extension() {
Some(oss) => {
let mut oss_gz = oss.to_os_string();
oss_gz.push(".gz");
new_path_with_gz.set_extension(oss_gz.as_os_str());
}
None => {
new_path_with_gz.set_extension("gz");
}
}

// search for restart-siblings
let mut pattern = new_path.clone();
if file_spec.o_suffix.is_some() {
pattern.set_extension("");
Expand All @@ -104,7 +115,9 @@ pub(super) fn collision_free_infix_for_rotated_file(
let mut restart_siblings =
glob::glob(&pattern).unwrap(/*ok*/).map(Result::unwrap).collect::<Vec<PathBuf>>();

if (*new_path).exists() || !restart_siblings.is_empty() {
// if collision would occur (new_path or compressed new_path exists already),
// find highest restart and add 1, else continue without restart
if new_path.exists() || new_path_with_gz.exists() || !restart_siblings.is_empty() {
let next_number = if restart_siblings.is_empty() {
0
} else {
Expand All @@ -118,7 +131,7 @@ pub(super) fn collision_free_infix_for_rotated_file(
new_path.to_string_lossy().to_string()
};
let index = file_stem_string.find(".restart-").unwrap(/*ok*/);
file_stem_string[(index + 9)..].parse::<usize>().unwrap(/*ok*/) + 1
file_stem_string[(index + 9)..(index + 13)].parse::<usize>().unwrap(/*ok*/) + 1
};

infix_date_string
Expand Down Expand Up @@ -160,9 +173,9 @@ mod test {
.collect();

assert_eq!(
dbg!(now),
now,
// TODO: use mocking to avoid code duplication:
// this test is only useful if the path evaluation sis the same as in
// this test is only useful if the path evaluation is the same as in
// super::latest_timestamp_file()
paths
.iter()
Expand Down
90 changes: 90 additions & 0 deletions tests/test_rotate_immediate_compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
mod test_utils;

#[cfg(feature = "compress")]
use chrono::Local;
#[cfg(feature = "compress")]
use flexi_logger::{Age, Cleanup, Criterion, Duplicate, FileSpec, Logger, Naming};
#[cfg(feature = "compress")]
use log::*;

#[cfg(feature = "compress")]
const COUNT: u8 = 4;

#[cfg(feature = "compress")]
#[test]
fn test_rotate_immediate_compression() {
if let Some(value) = test_utils::dispatch(COUNT) {
work(value)
}
}

#[cfg(feature = "compress")]
fn work(value: u8) {
match value {
0 => test_variant(
Naming::Timestamps,
Criterion::Age(Age::Second),
Cleanup::KeepCompressedFiles(100),
),
1 => test_variant(
Naming::TimestampsDirect,
Criterion::Age(Age::Second),
Cleanup::KeepCompressedFiles(100),
),
2 => test_variant(
Naming::Numbers,
Criterion::Age(Age::Second),
Cleanup::KeepCompressedFiles(100),
),
3 => test_variant(
Naming::NumbersDirect,
Criterion::Age(Age::Second),
Cleanup::KeepCompressedFiles(100),
),
COUNT..=u8::MAX => unreachable!("asAS"),
}
}

#[cfg(feature = "compress")]
fn test_variant(naming: Naming, criterion: Criterion, cleanup: Cleanup) {
let directory = test_utils::dir();

test_utils::wait_for_start_of_second();

let logger = Logger::try_with_str("trace")
.unwrap()
.log_to_file(FileSpec::default().directory(&directory))
.format_for_files(flexi_logger::detailed_format)
.format_for_stderr(flexi_logger::detailed_format)
.duplicate_to_stderr(Duplicate::Info)
.rotate(criterion, naming, cleanup)
.start()
.unwrap_or_else(|e| panic!("Logger initialization failed with {e}"));

info!(
"test correct rotation by {}",
match criterion {
Criterion::Age(_) => "age",
Criterion::AgeOrSize(_, _) => "age or size",
Criterion::Size(_) => "size",
}
);
let mut written_lines = 1;
let start = Local::now();
let duration = chrono::Duration::from_std(std::time::Duration::from_millis(3200)).unwrap();
while Local::now() - start < duration {
written_lines += 1;
if written_lines % 17 == 4 {
logger.trigger_rotation().unwrap();
}
trace!("line_count = {written_lines}");
std::thread::sleep(std::time::Duration::from_millis(7));
}

std::thread::sleep(std::time::Duration::from_millis(100));
let read_lines = test_utils::count_log_lines(&directory);
assert_eq!(
read_lines, written_lines,
"wrong line count: {read_lines} instead of {written_lines}"
);
}
51 changes: 50 additions & 1 deletion tests/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
#![allow(dead_code)]

use chrono::{DateTime, Local};
use std::path::PathBuf;
use either::Either;
use flate2::read::GzDecoder;
#[cfg(feature = "compress")]
use std::ffi::OsStr;
use std::{
fs::File,
io::{BufRead, BufReader},
ops::Add,
path::{Path, PathBuf},
};

const CTRL_INDEX: &str = "CTRL_INDEX";

Expand Down Expand Up @@ -105,3 +114,43 @@ pub fn wait_for_end_of_second() {
}
}
}

// Count all log lines written in all .log and .log.gz files in the given folder
pub fn count_log_lines(directory: &Path) -> usize {
// read all files
let pattern = directory.display().to_string().add("/*");
let all_files = match glob::glob(&pattern) {
Err(e) => panic!("Is this ({pattern}) really a directory? Listing failed with {e}",),
Ok(globresults) => globresults,
};

let mut total_line_count = 0_usize;
for file in all_files.into_iter() {
let pathbuf = file.unwrap_or_else(|e| panic!("Ups - error occured: {e}"));
let mut reader: Either<BufReader<GzDecoder<File>>, BufReader<File>> =
match pathbuf.extension() {
#[cfg(feature = "compress")]
Some(os_str) if os_str == AsRef::<OsStr>::as_ref("gz") => {
// unpack
Either::Left(BufReader::new(GzDecoder::new(
File::open(&pathbuf)
.unwrap_or_else(|e| panic!("Cannot open file {pathbuf:?} due to {e}")),
)))
}
_ => {
Either::Right(BufReader::new(File::open(&pathbuf).unwrap_or_else(|e| {
panic!("Cannot open file {pathbuf:?} due to {e}")
})))
}
};

let mut buffer = String::new();
let mut line_count = 0_usize;
while reader.read_line(&mut buffer).unwrap() > 0 {
line_count += 1;
}
total_line_count += line_count;
}

total_line_count
}

0 comments on commit e892b8f

Please sign in to comment.