Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

assets: add support for configurable bufsize for FileHandler #649

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions gotham/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,15 @@ tokio-rustls = { version = "0.23", optional = true }
uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["cargo_bench_support", "plotters", "rayon", "async_futures", "async_tokio"] }
futures-executor = "0.3.14"
reqwest = "0.12.2"
tempfile = "3.10.1"
tokio = { version = "1.11.0", features = ["macros", "test-util"] }

[package.metadata.docs.rs]
all-features = true

[[bench]]
name = "file_handler"
harness = false
152 changes: 152 additions & 0 deletions gotham/benches/file_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::{
collections::HashMap,
fs::File,
io::{BufWriter, Write},
net::{SocketAddr, ToSocketAddrs},
sync::atomic::{AtomicU64, Ordering::Relaxed},
time::{Duration, SystemTime},
};

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use futures_util::future;
use gotham::{
bind_server,
handler::FileOptions,
router::{
build_simple_router,
builder::{DefineSingleRoute, DrawRoutes},
},
};
use tempfile::TempDir;
use tokio::{
net::TcpListener,
runtime::{self, Runtime},
};

struct BenchServer {
runtime: Runtime,
addr: SocketAddr,
#[allow(dead_code)]
tmp: TempDir,
// sizes of test files
sizes: Vec<u64>,
buf_paths: HashMap<String, Option<usize>>,
}

impl BenchServer {
fn new() -> anyhow::Result<Self> {
let tmp = TempDir::new()?;
// temporary datafiles
let sizes = [10, 17, 24]
.iter()
.filter_map(|sz| {
let size = 1 << sz;
mk_tmp(&tmp, size).ok()
})
.collect();
let buf_paths = HashMap::from([
("default".to_string(), None),
("128k".to_string(), Some(1 << 17)),
]);

let router = build_simple_router(|route| {
for (path, sz) in &buf_paths {
let mut opts = FileOptions::from(tmp.path().to_owned());
if let Some(size) = sz {
opts.with_buffer_size(*size);
}
route
.get(format!("/{path}/*").as_str())
.to_dir(opts.to_owned())
}
});
let runtime = runtime::Builder::new_multi_thread()
.worker_threads(num_cpus::get())
.thread_name("file_handler-bench")
.enable_all()
.build()
.unwrap();
// build server manually so that we can capture the actual port instead of 0
let addr: std::net::SocketAddr = "127.0.0.1:0".to_socket_addrs().unwrap().next().unwrap();
let listener = runtime.block_on(TcpListener::bind(addr)).unwrap();
// use any free port
let addr = listener.local_addr().unwrap();
let _ = runtime.spawn(async move {
bind_server(listener, router, future::ok).await;
});
std::thread::sleep(Duration::from_millis(100));
Ok(Self {
runtime,
addr,
tmp,
sizes,
buf_paths,
})
}
}

fn mk_tmp(tmp: &TempDir, size: u64) -> anyhow::Result<u64> {
let filename = tmp.path().join(format!("{size}"));
let file = File::create(filename)?;
let mut w = BufWriter::with_capacity(2 << 16, file);
// pseudo random data: time stamp as bytes
let ts_data = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos()
.to_le_bytes();
for _ in (0..size).step_by(ts_data.len()) {
w.write_all(&ts_data)?;
}
Ok(size)
}

pub fn filehandler_benchmark(c: &mut Criterion) {
let server = BenchServer::new().unwrap();

let runtime = server.runtime;
let client = reqwest::Client::builder().build().unwrap();
let counter = AtomicU64::new(0);
let failed = AtomicU64::new(0);

for file_size in server.sizes {
let mut group = c.benchmark_group("server_bench");
group.throughput(Throughput::Bytes(file_size));
for (path, buf_size) in &server.buf_paths {
let url = format!("http://{}/{path}/{file_size}", server.addr);
let req = client.get(url).build().unwrap();
group.bench_with_input(
BenchmarkId::new(
"test_file_handler",
format!("filesize: {file_size}, bufsize: {buf_size:?}"),
),
&req,
|b, req| {
b.to_async(&runtime).iter(|| async {
let r = client.execute(req.try_clone().unwrap()).await;
counter.fetch_add(1, Relaxed);
match r {
Err(_) => {
failed.fetch_add(1, Relaxed);
}
Ok(res) => {
// sanity check: did we get what was expected?
assert_eq!(res.content_length().unwrap(), file_size);
let _ = res.bytes().await.unwrap();
}
}
});
},
);
}
}
println!("Errors {}/{}", failed.load(Relaxed), counter.load(Relaxed));
}

criterion_group! {
name = file_handler;
config = Criterion::default().measurement_time(Duration::from_millis(10_000)).warm_up_time(Duration::from_millis(10));
targets = filehandler_benchmark
}

criterion_main!(file_handler);
15 changes: 13 additions & 2 deletions gotham/src/handler/assets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct FileOptions {
cache_control: String,
gzip: bool,
brotli: bool,
buffer_size: Option<usize>,
}

impl FileOptions {
Expand All @@ -88,6 +89,7 @@ impl FileOptions {
cache_control: "public".to_string(),
gzip: false,
brotli: false,
buffer_size: None,
}
}

Expand All @@ -111,6 +113,13 @@ impl FileOptions {
self
}

/// Sets the maximum buffer size to be used when serving the file.
/// If unset, the default maximum buffer size corresponding to file system block size will be used.
pub fn with_buffer_size(&mut self, buf_sz: usize) -> &mut Self {
self.buffer_size = Some(buf_sz);
self
}

/// Clones `self` to return an owned value for passing to a handler.
pub fn build(&mut self) -> Self {
self.clone()
Expand Down Expand Up @@ -215,7 +224,9 @@ fn create_file_response(options: FileOptions, state: State) -> Pin<Box<HandlerFu
.body(Body::empty())
.unwrap());
}
let buf_size = optimal_buf_size(&meta);
let buf_size = options
.buffer_size
.unwrap_or_else(|| optimal_buf_size(&meta));
let (len, range_start) = match resolve_range(meta.len(), &headers) {
Ok((len, range_start)) => (len, range_start),
Err(e) => {
Expand All @@ -229,7 +240,7 @@ fn create_file_response(options: FileOptions, state: State) -> Pin<Box<HandlerFu
file.seek(SeekFrom::Start(seek_to)).await?;
};

let stream = file_stream(file, buf_size, len);
let stream = file_stream(file, cmp::min(buf_size, len as usize), len);
let body = Body::wrap_stream(stream.into_stream());
let mut response = hyper::Response::builder()
.status(StatusCode::OK)
Expand Down
Loading