Skip to content

Commit

Permalink
in tests, make use of cond_sync rather than std::sync::CondVar directly
Browse files Browse the repository at this point in the history
  • Loading branch information
emabee committed Sep 22, 2024
1 parent 18f6b24 commit abd66da
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ It also allows defining additional log streams, e.g. for alert or security messa
"""
documentation = "https://docs.rs/flexi_logger"
edition = "2021"
homepage = "https://crates.io/crates/flexi_logger"
keywords = ["file", "logger"]
license = "MIT OR Apache-2.0"
readme = "README.md"
Expand Down Expand Up @@ -66,6 +65,7 @@ tracing-subscriber = { version = "0.3", optional = true, features = [
libc = { version = "^0.2.50", optional = true }

[dev-dependencies]
cond_sync = "0.2"
either = "1.9"
flate2 = "1.0"
serde_derive = "1.0"
Expand Down
39 changes: 21 additions & 18 deletions tests/test_multi_threaded_cleanup_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod test_utils;
#[cfg(feature = "compress")]
mod d {
use chrono::{Local, NaiveDateTime};
use cond_sync::{CondSync, Other};
use flate2::bufread::GzDecoder;
use flexi_logger::{
Cleanup, Criterion, DeferredNow, Duplicate, FileSpec, LogSpecification, Logger, Naming,
Expand All @@ -16,11 +17,9 @@ mod d {
io::{BufRead, BufReader, Write},
ops::Add,
path::{Path, PathBuf},
thread::{self, JoinHandle},
thread::JoinHandle,
};

use crate::test_utils;

const NO_OF_THREADS: usize = 5;
const NO_OF_LOGLINES_PER_THREAD: usize = 20_000;
const ROTATE_OVER_SIZE: u64 = 600_000;
Expand All @@ -31,8 +30,6 @@ mod d {
// verify that all log lines are written correctly
#[test]
fn multi_threaded() {
test_utils::wait_for_start_of_second();

let start = Local::now();
let directory = super::test_utils::dir();
let end = {
Expand Down Expand Up @@ -61,10 +58,12 @@ mod d {
verify the log"
);

let worker_handles = start_worker_threads(NO_OF_THREADS);
let new_spec = LogSpecification::parse("trace").unwrap();
thread::sleep(std::time::Duration::from_millis(500));
logger.set_new_spec(new_spec);
let cond_sync = CondSync::new(0_usize);
let worker_handles = start_worker_threads(NO_OF_THREADS, &cond_sync);
cond_sync
.wait_until(|value| *value == NO_OF_THREADS)
.unwrap();
logger.set_new_spec(LogSpecification::parse("trace").unwrap());

wait_for_workers_to_close(worker_handles);
Local::now()
Expand All @@ -80,16 +79,20 @@ mod d {
}

// Starts given number of worker threads and lets each execute `do_work`
fn start_worker_threads(no_of_workers: usize) -> Vec<JoinHandle<u8>> {
fn start_worker_threads(
no_of_workers: usize,
cond_sync: &CondSync<usize>,
) -> Vec<JoinHandle<u8>> {
let mut worker_handles: Vec<JoinHandle<u8>> = Vec::with_capacity(no_of_workers);
trace!("Starting {} worker threads", no_of_workers);
for thread_number in 0..no_of_workers {
trace!("Starting thread {}", thread_number);
let cond_sync_t = cond_sync.clone();
worker_handles.push(
thread::Builder::new()
std::thread::Builder::new()
.name(thread_number.to_string())
.spawn(move || {
do_work(thread_number);
do_work(thread_number, cond_sync_t);
0
})
.unwrap(),
Expand All @@ -99,16 +102,16 @@ mod d {
worker_handles
}

fn do_work(thread_number: usize) {
fn do_work(thread_number: usize, cond_sync: CondSync<usize>) {
trace!("({}) Thread started working", thread_number);
trace!("ERROR_IF_PRINTED");
cond_sync
.modify_and_notify(|value| *value += 1, Other::One)
.unwrap();

for idx in 0..NO_OF_LOGLINES_PER_THREAD {
if idx % 1_000 == 0 {
std::thread::yield_now();
}
debug!("({}) writing out line number {}", thread_number, idx);
}
std::thread::sleep(std::time::Duration::from_millis(500));
trace!("MUST_BE_PRINTED");
}

Expand All @@ -130,7 +133,7 @@ mod d {
w,
"XXXXX [{}] T[{:?}] {} [{}:{}] {}",
now.now().format("%Y-%m-%d %H:%M:%S%.6f %:z"),
thread::current().name().unwrap_or("<unnamed>"),
std::thread::current().name().unwrap_or("<unnamed>"),
record.level(),
record.file().unwrap_or("<unnamed>"),
record.line().unwrap_or(0),
Expand Down
44 changes: 27 additions & 17 deletions tests/test_multi_threaded_cleanup_use_utc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ mod test_utils;

#[cfg(feature = "compress")]
mod d {
use cond_sync::{CondSync, Other};
use flexi_logger::{
Cleanup, Criterion, DeferredNow, Duplicate, FileSpec, LogSpecification, LogfileSelector,
Logger, Naming, WriteMode, TS_DASHES_BLANK_COLONS_DOT_BLANK,
};
use glob::glob;
use log::*;
use std::{
ops::Add,
thread::{self, JoinHandle},
};
use std::{ops::Add, thread::JoinHandle};

const NO_OF_THREADS: usize = 5;
const NO_OF_LOGLINES_PER_THREAD: usize = 20_000;
Expand All @@ -23,7 +21,6 @@ mod d {
// so that it is easier to verify that all log lines are written correctly
#[test]
fn multi_threaded() {
super::test_utils::wait_for_start_of_second();
let directory = super::test_utils::dir();
{
let _stopwatch = super::test_utils::Stopwatch::default();
Expand All @@ -45,15 +42,19 @@ mod d {
.use_utc()
.start()
.unwrap_or_else(|e| panic!("Logger initialization failed with {e}"));

info!(
"create a huge number of log lines with a considerable number of threads, \
verify the log"
verify the log"
);

let worker_handles = start_worker_threads(NO_OF_THREADS);
let new_spec = LogSpecification::parse("trace").unwrap();
thread::sleep(std::time::Duration::from_millis(500));
logger.set_new_spec(new_spec);
let cond_sync = CondSync::new(0_usize);
let worker_handles = start_worker_threads(NO_OF_THREADS, &cond_sync);
cond_sync
.wait_until(|value| *value == NO_OF_THREADS)
.unwrap();

logger.set_new_spec(LogSpecification::parse("trace").unwrap());

join_all_workers(worker_handles);

Expand All @@ -68,21 +69,26 @@ mod d {
for f in log_files {
debug!("Existing log file: {f:?}");
}
}
} // drop stopwatch and logger

verify_logs(&directory.display().to_string());
}

// Starts given number of worker threads and lets each execute `do_work`
fn start_worker_threads(no_of_workers: usize) -> Vec<JoinHandle<u8>> {
fn start_worker_threads(
no_of_workers: usize,
cond_sync: &CondSync<usize>,
) -> Vec<JoinHandle<u8>> {
let mut worker_handles: Vec<JoinHandle<u8>> = Vec::with_capacity(no_of_workers);
trace!("Starting {} worker threads", no_of_workers);
for thread_number in 0..no_of_workers {
trace!("Starting thread {}", thread_number);
let cond_sync_t = cond_sync.clone();
worker_handles.push(
thread::Builder::new()
std::thread::Builder::new()
.name(thread_number.to_string())
.spawn(move || {
do_work(thread_number);
do_work(thread_number, cond_sync_t);
0
})
.unwrap(),
Expand All @@ -92,13 +98,17 @@ mod d {
worker_handles
}

fn do_work(thread_number: usize) {
fn do_work(thread_number: usize, cond_sync: CondSync<usize>) {
trace!("({}) Thread started working", thread_number);
trace!("ERROR_IF_PRINTED");

cond_sync
.modify_and_notify(|value| *value += 1, Other::One)
.unwrap();

for idx in 0..NO_OF_LOGLINES_PER_THREAD {
debug!("({}) writing out line number {}", thread_number, idx);
}
std::thread::sleep(std::time::Duration::from_millis(500));
trace!("MUST_BE_PRINTED");
}

Expand All @@ -120,7 +130,7 @@ mod d {
w,
"XXXXX [{}] T[{:?}] {} [{}:{}] {}",
now.format(TS_DASHES_BLANK_COLONS_DOT_BLANK),
thread::current().name().unwrap_or("<unnamed>"),
std::thread::current().name().unwrap_or("<unnamed>"),
record.level(),
record.file().unwrap_or("<unnamed>"),
record.line().unwrap_or(0),
Expand Down
56 changes: 23 additions & 33 deletions tests/test_multi_threaded_dates.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
mod test_utils;

use cond_sync::{CondSync, Other};
use flexi_logger::{
Age, Cleanup, Criterion, DeferredNow, Duplicate, FileSpec, LogSpecification, Logger, Naming,
TS_DASHES_BLANK_COLONS_DOT_BLANK,
};
use glob::glob;
use log::*;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::ops::Add;
use std::thread::JoinHandle;
use std::sync::{Arc, Mutex, Condvar};
use std::{
fs::File,
io::{BufRead, BufReader},
ops::Add,
thread::JoinHandle,
};

const NO_OF_THREADS: usize = 5;
const NO_OF_LOGLINES_PER_THREAD: usize = 20_000;
Expand All @@ -19,13 +21,10 @@ const NO_OF_LOGLINES_PER_THREAD: usize = 20_000;
// verify that all log lines are written correctly
#[test]
fn test_multi_threaded_dates() {
test_utils::wait_for_start_of_second();

let directory = test_utils::dir();
{
let logger;
let _stopwatch = test_utils::Stopwatch::default();
logger = Logger::try_with_str("debug")
let logger = Logger::try_with_str("debug")
.unwrap()
.log_to_file(FileSpec::default().directory(&directory))
.format(test_format)
Expand All @@ -39,40 +38,34 @@ fn test_multi_threaded_dates() {
.start()
.unwrap_or_else(|e| panic!("Logger initialization failed with {e}"));

let mtx_cvar_pair = Arc::new((Mutex::new(0), Condvar::new()));

info!("create many log lines with a considerable number of threads, verify the log");

let worker_handles = start_worker_threads(NO_OF_THREADS, mtx_cvar_pair.clone());

{
let (mtx, cvar) = &*mtx_cvar_pair;
let mut started = mtx.lock().unwrap();
while *started != NO_OF_THREADS {
started = cvar.wait(started).unwrap();
}
}
let cond_sync = CondSync::new(0_usize);
let worker_handles = start_worker_threads(NO_OF_THREADS, &cond_sync);
cond_sync
.wait_until(|value| *value == NO_OF_THREADS)
.unwrap();

logger.set_new_spec(LogSpecification::parse("trace").unwrap());

wait_for_workers_to_close(worker_handles);
}
join_all_workers(worker_handles);
} // drop stopwatch

verify_logs(&directory.display().to_string());
}

// Starts given number of worker threads and lets each execute `do_work`
fn start_worker_threads(no_of_workers: usize, mtx_cvar_pair: Arc<(Mutex<usize>,Condvar)>) -> Vec<JoinHandle<u8>> {
fn start_worker_threads(no_of_workers: usize, cond_sync: &CondSync<usize>) -> Vec<JoinHandle<u8>> {
let mut worker_handles: Vec<JoinHandle<u8>> = Vec::with_capacity(no_of_workers);
trace!("Starting {} worker threads", no_of_workers);
for thread_number in 0..no_of_workers {
trace!("Starting thread {}", thread_number);
let thread_mtx_cvar_pair = mtx_cvar_pair.clone();
let cond_sync_t = cond_sync.clone();
worker_handles.push(
std::thread::Builder::new()
.name(thread_number.to_string())
.spawn(move || {
do_work(thread_number, thread_mtx_cvar_pair);
do_work(thread_number, cond_sync_t);
0
})
.unwrap(),
Expand All @@ -82,24 +75,21 @@ fn start_worker_threads(no_of_workers: usize, mtx_cvar_pair: Arc<(Mutex<usize>,C
worker_handles
}

fn do_work(thread_number: usize, mtx_cvar_pair: Arc<(Mutex<usize>,Condvar)>) {
fn do_work(thread_number: usize, cond_sync: CondSync<usize>) {
trace!("({}) Thread started working", thread_number);
trace!("ERROR_IF_PRINTED");

{
let (mtx, cvar) = &*mtx_cvar_pair;
let mut started = mtx.lock().unwrap();
*started += 1;
cvar.notify_one();
}
cond_sync
.modify_and_notify(|value| *value += 1, Other::One)
.unwrap();

for idx in 0..NO_OF_LOGLINES_PER_THREAD {
debug!("({}) writing out line number {}", thread_number, idx);
}
trace!("MUST_BE_PRINTED");
}

fn wait_for_workers_to_close(worker_handles: Vec<JoinHandle<u8>>) {
fn join_all_workers(worker_handles: Vec<JoinHandle<u8>>) {
for worker_handle in worker_handles {
worker_handle
.join()
Expand Down
Loading

0 comments on commit abd66da

Please sign in to comment.