Skip to content

Commit

Permalink
working version of benchmark mode
Browse files Browse the repository at this point in the history
  • Loading branch information
garikello3d committed Dec 31, 2023
1 parent 62f7a44 commit ef1ad78
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 27 deletions.
18 changes: 18 additions & 0 deletions src/arg_opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,23 @@ pub enum Commands {
/// Buffer size for reading disk files, in MB
#[arg(long, value_name ="size_mb")]
buf_size: usize,
},
/// Benchmark mode: read data from stdin and try different combinations of input params to see how fast the process is
Bench {
/// Path to directory to store temporary files
#[arg(long, value_name = "/path/to/dir")]
out_dir: String,

/// Limit in seconds for each try
#[arg(long, value_name = "seconds")]
duration: usize,

/// LZMA compression levels to try, comma-separated levels (0 - 9)
#[arg(long, value_name = "level,level,level,...", value_delimiter = ',', num_args = 1..)]
compress_levels: Vec<u8>,

/// Buffer sizes for reading stdin data to try, comma-separated values (in MB)
#[arg(long, value_name ="size,size,size,...", value_delimiter = ',', num_args = 1..)]
buf_sizes: Vec<usize>,
}
}
83 changes: 77 additions & 6 deletions src/bin/bigarchiver/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use bigarchiver::arg_opts::{ArgOpts, Commands};
use bigarchiver::{backup, check};
use bigarchiver::{backup, check, timestamp};
use bigarchiver::file_set::cfg_from_pattern;
use bigarchiver::finalizable::DataSink;
use clap::Parser;
use std::io::{stdout, Write};
use std::process::ExitCode;
use std::{thread, fs};
use std::sync::{Arc, atomic::AtomicBool};

struct StdoutWriter;

Expand All @@ -31,33 +33,102 @@ fn process_args(args: &ArgOpts) -> Result<(), String> {
backup(&mut std::io::stdin(),
&auth, auth_every,
split_size, &out_template,
pass, *compress_level, buf_size)?;
pass, *compress_level, buf_size, None)?;
if !no_check {
let cfg_path = cfg_from_pattern(&out_template);
eprintln!("verifying...");
check(None::<StdoutWriter>, &cfg_path, pass, buf_size, &None::<&str>)
check(None::<StdoutWriter>, &cfg_path, pass, buf_size, &None::<&str>, true)
} else {
Ok(())
}
},

Commands::Restore { config, pass, buf_size, check_free_space, no_check } => {
let buf_size = *buf_size * 1_048_576;
if !no_check {
eprintln!("verifying before restore...");
check(None::<StdoutWriter>, &config, pass, buf_size, &None)
check(None::<StdoutWriter>, &config, pass, buf_size, &None, true)
.map_err(|e| format!("will not restore data, integrity check error: {}", e))?;
}
eprintln!("restoring...");
let may_be_check = check_free_space.as_ref().map(|s| s.as_str());
check(Some(StdoutWriter{}), &config, pass,
buf_size, &may_be_check)
buf_size, &may_be_check, true)
.map_err(|e| format!("error restoring data: {}", e))
},

Commands::Check { config, pass, buf_size } => {
eprintln!("verifying...");
let buf_size = *buf_size * 1_048_576;
check(None::<StdoutWriter>, &config, pass,
buf_size, &None)
buf_size, &None, true)
},

Commands::Bench { out_dir, duration, compress_levels, buf_sizes } => {
struct Throughput {
level: u8,
buf_size: usize,
time_spent_s: u64,
bytes: usize,
bps: usize
}

let mut thrpts: Vec<Throughput> = Vec::new();

for compress_level in compress_levels {
for buf_size in buf_sizes {
let exit_flag = Arc::new(AtomicBool::new(false));
let exit_flag_clone = exit_flag.clone();
let level = *compress_level;
let buf_size_bytes = *buf_size * 1_048_576;

let base_dir = format!("{}/{}-{}", out_dir, compress_level, buf_size);
let _ = fs::remove_dir_all(&base_dir); // we don't care if it does not exist
fs::create_dir(&base_dir).map_err(|e| format!("could not create directory {}: {}", &base_dir, e))?;

let out_template = format!("{}/%", &base_dir);
let out_cfg = format!("{}/0.cfg", &base_dir);

let ts_start = timestamp();

let thread: thread::JoinHandle<Result<usize, String>> = thread::spawn(move|| {
let bytes = backup(&mut std::io::stdin(),
"auth", 1_048_576,
usize::MAX, &out_template,
"pass", level, buf_size_bytes, Some(exit_flag_clone))?;

check(None::<StdoutWriter>, &out_cfg, "pass", buf_size_bytes, &None::<&str>, false)?;

Ok(bytes)
});

thread::sleep(std::time::Duration::from_millis(*duration as u64 * 1000));
//eprintln!("waking up");
exit_flag.store(true, std::sync::atomic::Ordering::SeqCst);
let bytes = thread.join().unwrap()?;
let ts_end = timestamp();
let ts_delta = ts_end - ts_start;

thrpts.push(Throughput{
level: *compress_level,
buf_size: *buf_size,
time_spent_s: ts_delta,
bytes: bytes,
bps: if ts_delta > 0 { bytes / ts_delta as usize } else { 0 }
});

fs::remove_dir_all(&base_dir).map_err(|e| format!("could not cleanup base directory {}: {}", &base_dir, e))?;
}
}

thrpts.sort_by(|a,b| b.bps.cmp(&a.bps));
println!("statistics gathered:");
thrpts.into_iter().for_each(|t| {
println!("speed = {} b/s\tbytes = {}\tseconds = {}\tlevel = {}\tbuffer = {} MB\t",
t.bps, t.bytes, t.time_spent_s, t.level, t.buf_size);
});

Ok(())
}
}
}
Expand Down
62 changes: 52 additions & 10 deletions src/buffered_reader.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
use std::io::Read;
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use crate::finalizable::DataSink;

pub struct BufferedReader<'a, R: Read, T: DataSink> {
read_from: &'a mut R,
write_to: &'a mut T,
read_buf_size: usize,
store_buf_size: usize
store_buf_size: usize,
exit_flag: Option<Arc<AtomicBool>>,
}

impl<'a, R: Read, T: DataSink> BufferedReader<'a, R, T> {
pub fn new(read_from: &'a mut R, write_to: &'a mut T, read_buf_size: usize, store_buf_size: usize) -> Self {
pub fn new(read_from: &'a mut R, write_to: &'a mut T, read_buf_size: usize, store_buf_size: usize, exit_flag: Option<Arc<AtomicBool>>) -> Self {
assert!(read_buf_size < store_buf_size);
Self { read_from, write_to, read_buf_size, store_buf_size }
Self { read_from, write_to, read_buf_size, store_buf_size, exit_flag: exit_flag }
}

pub fn read_and_write_all(&mut self) -> Result<(), String> {
let mut buf: Vec<u8> = Vec::with_capacity(self.store_buf_size);
buf.resize(self.store_buf_size, 0);

let mut eof = false;
//let mut ref_write_to = self.write_to.as_ref().borrow_mut();

while !eof {
let mut offs = 0;
let mut left = self.store_buf_size;

while left > self.read_buf_size {
if let Some(ex_flag) = &self.exit_flag {
if ex_flag.load(Ordering::SeqCst) {
eof = true;
break;
}
}

if let Ok(bytes_read) = self.read_from.read(&mut buf[offs..offs+self.read_buf_size]) {
if bytes_read > 0 {
//eprintln!("BufferedReader: read and buffered {} bytes from source", bytes_read);
Expand All @@ -51,20 +59,23 @@ impl<'a, R: Read, T: DataSink> BufferedReader<'a, R, T> {
#[cfg(test)]
mod tests {
use std::io::Read;
use std::sync::{Arc, atomic::{AtomicBool}};
use std::thread;
use rand::{thread_rng, Rng, RngCore};
use crate::finalizable::DataSink;
use crate::buffered_reader::BufferedReader;

struct DummyReader {
all_data: Vec<u8>,
offset: usize
offset: usize,
read_delay_ms: u32
}
impl DummyReader {
fn new(data_size: usize) -> Self {
fn new(data_size: usize, read_delay_ms: u32) -> Self {
let mut data = Vec::with_capacity(data_size);
data.resize(data_size, 0);
thread_rng().fill_bytes(&mut data);
Self { all_data: data, offset: 0 }
Self { all_data: data, offset: 0, read_delay_ms }
}
}
impl Read for DummyReader {
Expand All @@ -76,6 +87,9 @@ mod tests {
let to_return = thread_rng().gen::<usize>() % to_return_max + 1;
buf[..to_return].copy_from_slice(&self.all_data[self.offset..self.offset+to_return]);
self.offset += to_return;
if self.read_delay_ms > 0 {
thread::sleep(std::time::Duration::from_millis(self.read_delay_ms as u64));
}
Ok(to_return)
}
}
Expand All @@ -96,7 +110,7 @@ mod tests {

#[test]
fn dummy_reader() {
let mut dr = DummyReader::new(1000);
let mut dr = DummyReader::new(1000, 0);
let mut received_data = Vec::new();
let mut buf = [0u8; 100];
loop {
Expand All @@ -111,15 +125,43 @@ mod tests {

#[test]
fn buffered_reader() {
let mut dr = DummyReader::new(1000);
let mut dr = DummyReader::new(1000, 0);
{
let mut sink = TestSink{ data: Vec::new() };
{
let mut buf_reader = BufferedReader::new(&mut dr, &mut sink, 10, 100);
let mut buf_reader = BufferedReader::new(
&mut dr, &mut sink, 10, 100, None);
buf_reader.read_and_write_all().unwrap();
}
//assert_eq!(buf_reader.read_from.all_data, buf_reader.write_to.data);
assert_eq!(dr.all_data, sink.data);
}
}

#[test]
fn buffered_reader_stoppable() {
let exit_flag = Arc::new(AtomicBool::new(false));
let exit_flag_clone = exit_flag.clone();

let thread = thread::spawn(move|| {
let mut dr = DummyReader::new(1000, 100);
{
let mut sink = TestSink{ data: Vec::new() };
{
let mut buf_reader = BufferedReader::new(
&mut dr, &mut sink, 10, 100, Some(exit_flag_clone));
buf_reader.read_and_write_all().unwrap();
}
let nr_written = sink.data.len();
assert!(nr_written > 0);
assert!(nr_written < 30);
assert_eq!(dr.all_data[..nr_written], sink.data);
}

});

thread::sleep(std::time::Duration::from_millis(200));
exit_flag.store(true, std::sync::atomic::Ordering::SeqCst);
thread.join().unwrap();
}
}
21 changes: 13 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ use free_space::get_free_space;
use std::time::{SystemTime, UNIX_EPOCH};
use std::io::Read;
use time::OffsetDateTime;
use std::sync::{Arc, atomic::{AtomicBool}};

fn timestamp() -> u64 {
pub fn timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap() // SAFE: rely on fact that now() cannot return anything earlier than EPOCH
Expand All @@ -58,7 +59,7 @@ fn time_str() -> String {
pub fn backup<R: Read>(
mut read_from: R,
auth: &str, auth_every_bytes: usize, split_size_bytes: usize, out_template: &str,
pass: &str, compress_level: u8, buf_size_bytes: usize) -> Result<(), String>
pass: &str, compress_level: u8, buf_size_bytes: usize, exit_flag: Option<Arc<AtomicBool>>) -> Result<usize, String>
{
let hash_seed = timestamp();
let start_time_str = time_str();
Expand All @@ -79,7 +80,7 @@ pub fn backup<R: Read>(
let mut hash_copier = DataHasher::with_writer(Some(&mut comp), hash_seed);

let mut stdinbuf = BufferedReader::new(
&mut read_from, &mut hash_copier, buf_size_bytes / 8, buf_size_bytes);
&mut read_from, &mut hash_copier, buf_size_bytes / 8, buf_size_bytes, exit_flag);

stdinbuf.read_and_write_all()?;

Expand All @@ -91,20 +92,24 @@ pub fn backup<R: Read>(

let end_timestamp = timestamp();
let end_time_str = time_str();
let throughput_mbps = if end_timestamp - hash_seed != 0 { stats.in_data_len.unwrap() as u64 / 1024 / 1024 / (end_timestamp - hash_seed) } else { 0 };
let in_len = stats.in_data_len.unwrap();
let throughput_mbps = if end_timestamp - hash_seed != 0 { in_len as u64 / 1024 / 1024 / (end_timestamp - hash_seed) } else { 0 };
stats.misc_info = Some(format!("built from {}/{}, started at {}, ended at {}, took {} seconds, througput {} MB/s",
option_env!("GIT_BRANCH").unwrap_or("?"),
option_env!("GIT_REV").unwrap_or("?"),
start_time_str, end_time_str, end_timestamp - hash_seed, throughput_mbps));

spl.write_metadata(&stats)
spl.write_metadata(&stats)?;
Ok(in_len)
}


pub fn check<W: DataSink>(mut write_to: Option<W>, cfg_path: &str, pass: &str, buf_size_bytes: usize, check_free_space: &Option<&str>) -> Result<(), String> {
pub fn check<W: DataSink>(mut write_to: Option<W>, cfg_path: &str, pass: &str, buf_size_bytes: usize, check_free_space: &Option<&str>, show_info: bool) -> Result<(), String> {
let stats = read_metadata::<MultiFilesReader>(cfg_path)?;
eprintln!("authentication string: {}", stats.auth_string);
eprintln!("misc info: {}", stats.misc_info.as_ref().unwrap_or(&"none".to_owned()));
if show_info {
eprintln!("authentication string: {}", stats.auth_string);
eprintln!("misc info: {}", stats.misc_info.as_ref().unwrap_or(&"none".to_owned()));
}

if let Some(mount_point) = check_free_space {
let all_data = stats.in_data_len.unwrap(); // SAFE because if was checked in read_metadata()
Expand Down
6 changes: 3 additions & 3 deletions tests/backup_and_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ fn backup_restore_all_ok(input_size: usize, auth_size: usize, split_size: usize,
&out_tpl,
"secret",
9,
buf_size).unwrap();
buf_size, None).unwrap();

let src_unpacked = SinkToVector{ incoming: Vec::new(), etalon: &src };

check(
Some(src_unpacked),
&out_cfg,
"secret",
buf_size, &None::<&str>).unwrap();
buf_size, &None::<&str>, true).unwrap();

}

Expand All @@ -81,6 +81,6 @@ fn restore_no_free_space() {
auth=Author Name\n\
auth_len=3", usize::MAX);
File::create(cfg_path).unwrap().write_all(cfg_contents.as_bytes()).unwrap();
let err = check(Some(SinkToVector{ incoming: Vec::new(), etalon: b"" }), cfg_path, "", 100, &Some("/tmp")).unwrap_err();
let err = check(Some(SinkToVector{ incoming: Vec::new(), etalon: b"" }), cfg_path, "", 100, &Some("/tmp"), true).unwrap_err();
println!("err = {}", err);
}

0 comments on commit ef1ad78

Please sign in to comment.