Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt Limiter for TcpStream usage, lower sleep time, add parametric tests #13

Merged
merged 8 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ keywords = ["rate", "rate_limiting", "synchronous", "stream"]
[dev-dependencies]
sha2 = "0.10.6"
hex-literal = "0.4.1"
rand = { version = "0.8.5", features = ["small_rng"] }

[features]
heavy_testing = []
136 changes: 107 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,60 @@
//! assert_eq!(now.elapsed().as_secs(), 9);
//! ```
use std::{
debug_assert,
io::{self, Read, Write},
time::Duration,
};

const ACCEPTABLE_SPEED_DIFF: f64 = 4.0 / 100.0;

#[derive(Clone, Debug)]
pub struct LimiterOptions {
window_length: u128,
window_time: Duration,
bucket_size: usize,
pub window_length: u64,
pub window_time: Duration,
pub bucket_size: u64,
}

impl LimiterOptions {
pub fn new(window_length: u128, window_time: Duration, bucket_size: usize) -> LimiterOptions {
pub fn new(
mut window_length: u64,
mut window_time: Duration,
mut bucket_size: u64,
) -> LimiterOptions {
let rate = window_length.min(bucket_size) as f64;
let tw: f64 = window_time.as_nanos() as f64;
let init_speed = (rate / tw) * 1_000_000.0;

let mut new_speed = init_speed;
let mut new_wlen = window_length;
let mut new_wtime = window_time;
let mut new_bsize = bucket_size;

// While the difference between the intented speed (init_speed) and the reduced one (new_speed) is under the threshold
// Each iteration, divide all the options by 2, and recompute the speed (in order to check if it's not altered)
// Because we want the values BEFORE the speed is above the threshold, assign the new values on start of the new iter only
while ((new_speed / init_speed) - 1.0).abs() < ACCEPTABLE_SPEED_DIFF {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you place a comment at the top of this block of code because it's not trivial to understand and i'm having hard time to figure out what's going on

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments, hope it's better now 😅

// Values from past iter, we know they're under the threshold
window_length = new_wlen;
window_time = new_wtime;
bucket_size = new_bsize;

// If values aren't dividable by 2
if (new_wlen == 1) || (new_bsize == 1) || (new_wtime.as_nanos() == 1) {
break;
}

// Reduce the options
new_wlen /= 2;
new_wtime /= 2;
new_bsize /= 2;

// Recompute the new speed
let rate = new_wlen.min(new_bsize) as f64;
let tw: f64 = new_wtime.as_nanos() as f64;
new_speed = (rate / tw) * 1_000_000.0;
}

LimiterOptions {
window_length,
window_time,
Expand All @@ -46,6 +88,7 @@ where
write_opt: Option<LimiterOptions>,
last_read_check: Option<std::time::Instant>,
last_write_check: Option<std::time::Instant>,
additionnal_tokens: (u64, u64),
}

impl<S> Limiter<S>
Expand Down Expand Up @@ -77,17 +120,18 @@ where
},
read_opt,
write_opt,
additionnal_tokens: (0, 0),
}
}

fn stream_cap_limit(&self) -> (Option<usize>, Option<usize>) {
fn stream_cap_limit(&self) -> (Option<u64>, Option<u64>) {
let read_cap = if let Some(LimiterOptions {
window_length,
bucket_size,
..
}) = self.read_opt
{
Some(std::cmp::min(window_length as usize, bucket_size))
Some(std::cmp::min(window_length, bucket_size))
} else {
None
};
Expand All @@ -97,25 +141,33 @@ where
..
}) = self.write_opt
{
Some(std::cmp::min(window_length as usize, bucket_size))
Some(std::cmp::min(window_length, bucket_size))
} else {
None
};
(read_cap, write_cap)
}

fn tokens_available(&self) -> (Option<usize>, Option<usize>) {
fn tokens_available(&self) -> (Option<u64>, Option<u64>) {
let read_tokens = if let Some(LimiterOptions {
window_length,
window_time,
bucket_size,
}) = self.read_opt
{
Some(std::cmp::min(
((self.last_read_check.unwrap().elapsed().as_nanos() / window_time.as_nanos())
* window_length) as usize,
bucket_size,
))
let lrc = match u64::try_from(self.last_read_check.unwrap().elapsed().as_nanos()) {
Ok(n) => n,
// Will cap the last_read_check at a duration of about 584 years
Err(_) => u64::MAX,
};
Some(
std::cmp::min(
(lrc / u64::try_from(window_time.as_nanos())
.expect("Window time nanos > u64::MAX"))
* window_length,
bucket_size,
) + self.additionnal_tokens.0,
)
} else {
None
};
Expand All @@ -125,11 +177,19 @@ where
bucket_size,
}) = self.write_opt
{
Some(std::cmp::min(
((self.last_write_check.unwrap().elapsed().as_nanos() / window_time.as_nanos())
* window_length) as usize,
bucket_size,
))
let lwc = match u64::try_from(self.last_write_check.unwrap().elapsed().as_nanos()) {
Ok(n) => n,
// Will cap the last_read_check at a duration of about 584 years
Err(_) => u64::MAX,
};
Some(
std::cmp::min(
(lwc / u64::try_from(window_time.as_nanos())
.expect("Window time nanos > u64::MAX"))
* window_length,
bucket_size,
) + self.additionnal_tokens.1,
)
} else {
None
};
Expand All @@ -152,7 +212,7 @@ where
/// If you didn't read for 10 secondes in this stream and you try to read 10 bytes, it will read instantly.
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut read = 0;
let mut buf_left = buf.len();
let mut buf_left = u64::try_from(buf.len()).expect("R buflen to u64");
let readlimit = if let (Some(limit), _) = self.stream_cap_limit() {
limit
} else {
Expand All @@ -167,22 +227,31 @@ where
} else {
Duration::ZERO
};
self.last_read_check = Some(std::time::Instant::now());
std::thread::sleep(window_time.saturating_sub(elapsed));
debug_assert!(self.tokens_available().0.unwrap() > 0);
continue;
}
// Before reading so that we don't count the time it takes to read
self.last_read_check = Some(std::time::Instant::now());
let buf_read_end = read + nb_bytes_readable.min(buf_left);
let read_now = self.stream.read(&mut buf[read..buf_read_end])?;
if read_now < nb_bytes_readable {
let read_start = usize::try_from(read).expect("R read_start to usize");
let read_end = usize::try_from(read + nb_bytes_readable.min(buf_left))
.expect("R read_end to usize");
let read_now = u64::try_from(self.stream.read(&mut buf[read_start..read_end])?)
.expect("R read_now to u64");
if read_now == 0 {
break;
}
if read_now < nb_bytes_readable {
self.additionnal_tokens.0 = self
.additionnal_tokens
.0
.saturating_add(nb_bytes_readable - read_now);
}
read += read_now;
buf_left -= read_now;
}
self.last_read_check = Some(std::time::Instant::now());
Ok(read)
Ok(usize::try_from(read).expect("R return to usize"))
}
}

Expand All @@ -194,7 +263,7 @@ where
/// If you didn't write for 10 secondes in this stream and you try to write 10 bytes, it will write instantly.
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut write = 0;
let mut buf_left = buf.len();
let mut buf_left = u64::try_from(buf.len()).expect("W buflen to u64");
let writelimit = if let (_, Some(limit)) = self.stream_cap_limit() {
limit
} else {
Expand All @@ -209,22 +278,31 @@ where
} else {
Duration::ZERO
};
self.last_write_check = Some(std::time::Instant::now());
std::thread::sleep(window_time.saturating_sub(elapsed));
debug_assert!(self.tokens_available().1.unwrap() > 0);
continue;
}
// Before reading so that we don't count the time it takes to read
self.last_write_check = Some(std::time::Instant::now());
let buf_write_end = write + nb_bytes_writable.min(buf_left);
let write_now = self.stream.write(&buf[write..buf_write_end])?;
let write_start = usize::try_from(write).expect("W write_start to usize");
let write_end = usize::try_from(write + nb_bytes_writable.min(buf_left))
.expect("W write_end to usize");
let write_now = u64::try_from(self.stream.write(&buf[write_start..write_end])?)
.expect("W write_now_ to u64");
if write_now < nb_bytes_writable {
break;
}
if write_now < nb_bytes_writable {
self.additionnal_tokens.1 = self
.additionnal_tokens
.1
.saturating_add(nb_bytes_writable - write_now);
}
write += write_now;
buf_left -= write_now;
}
self.last_write_check = Some(std::time::Instant::now());
Ok(write)
Ok(usize::try_from(write).expect("W return to usize"))
}

fn flush(&mut self) -> io::Result<()> {
Expand Down
Loading