Skip to content

Commit

Permalink
Adaptable rate of messages
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 25, 2024
1 parent 1b2f186 commit e59e4b2
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 20 deletions.
2 changes: 1 addition & 1 deletion config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ followers:
google_project_id: "pub-verse-app"
google_topic: "follow-changes"
flush_period_seconds: 60 # how often to flush the buffer to generate messages
min_seconds_between_messages: 43200 # Half a day, so 2 messages per day
min_seconds_between_messages: 900 # 15 minutes
pagerank_cron_expression: "0 0 0 * * *" # Daily at midnight
http_cache_seconds: 86400 # 24 hours
2 changes: 1 addition & 1 deletion src/domain/follow_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl FollowChange {
self.friendly_followee = name;
}

pub fn is_notifiable(&self) -> bool {
pub fn is_follower(&self) -> bool {
matches!(self.change_type, ChangeType::Followed)
}

Expand Down
86 changes: 69 additions & 17 deletions src/domain/followee_notification_factory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{FollowChange, NotificationMessage, MAX_FOLLOWERS_PER_BATCH};
use crate::rate_limiter::RateLimiter;
use nostr_sdk::PublicKey;
use ordermap::OrderMap;
use std::fmt::Debug;
Expand All @@ -8,19 +9,26 @@ use tokio::time::Instant;
type Follower = PublicKey;
type Followee = PublicKey;

/// Accumulates messages for a followee and flushes them in batches
pub struct FolloweeNotificationFactory {
pub follow_changes: OrderMap<Follower, Box<FollowChange>>,
pub followee: Option<Followee>,
min_time_between_messages: Duration,
rate_limiter: RateLimiter,
emptied_at: Option<Instant>,
}

impl FolloweeNotificationFactory {
pub fn new(min_time_between_messages: Duration) -> Self {
// Rate limiter for 1 message every 12 hours, bursts of 10
let capacity = 10.0;
let rate_limiter = RateLimiter::new(capacity, Duration::from_secs(12 * 60 * 60));

Self {
follow_changes: OrderMap::with_capacity(100),
followee: None,
min_time_between_messages,
rate_limiter,
emptied_at: None,
}
}
Expand Down Expand Up @@ -55,46 +63,90 @@ impl FolloweeNotificationFactory {
self.follow_changes.insert(*follower, follow_change);
}

// This is basically a sliding window log rate limiter
// No flushes if the time since the last flush is less than min_time_between_messages
pub fn should_flush(&self) -> bool {
match self.emptied_at {
Some(emptied_at) => {
let now = Instant::now();
assert!(emptied_at <= now);
now.duration_since(emptied_at) > self.min_time_between_messages
}
// Flushes if minimum time between messages has elapsed and rate limit is not exceeded.
// If a day has elapsed since the last flush, it will flush regardless of the rate limit.
pub fn should_flush(&mut self) -> bool {
let now = Instant::now();

let min_time_elapsed = match self.emptied_at {
Some(emptied_at) => now.duration_since(emptied_at) >= self.min_time_between_messages,
None => true,
};

if !min_time_elapsed {
return false;
}

let one_day_elapsed = match self.emptied_at {
Some(emptied_at) => now.duration_since(emptied_at) >= Duration::from_secs(24 * 60 * 60),
None => true,
};

if one_day_elapsed {
return true;
}

// Check if tokens are available without consuming them
self.rate_limiter.can_consume(1.0)
}

pub fn should_delete(&self) -> bool {
pub fn followers_len(&self) -> usize {
self.follow_changes
.iter()
.filter(|(_, v)| v.is_follower())
.count()
}

pub fn should_delete(&mut self) -> bool {
self.follow_changes.is_empty() && self.should_flush()
}

pub fn no_notifiables(&self) -> bool {
!self.follow_changes.iter().any(|(_, v)| v.is_notifiable())
pub fn no_followers(&self) -> bool {
!self.follow_changes.iter().any(|(_, v)| v.is_follower())
}

// Only followers are accumulated into messages, unfollowers are not, but
// all of them are drained
pub fn flush(&mut self) -> Vec<NotificationMessage> {
if self.no_notifiables() {
if self.no_followers() {
return vec![];
}

if self.should_flush() {
self.emptied_at = Some(Instant::now());
let now = Instant::now();
let one_day_elapsed = match self.emptied_at {
Some(emptied_at) => {
now.duration_since(emptied_at) >= Duration::from_secs(24 * 60 * 60)
}
None => true,
};

return self
self.emptied_at = Some(now);

let followers = self
.follow_changes
.drain(..)
.map(|(_, v)| v)
.filter(|v| v.is_notifiable())
.collect::<Vec<Box<FollowChange>>>()
.filter(|v| v.is_follower())
.collect::<Vec<Box<FollowChange>>>();

let messages: Vec<NotificationMessage> = followers
.chunks(MAX_FOLLOWERS_PER_BATCH)
.map(|batch| batch.to_vec().into())
.collect();

let tokens_needed = messages.len() as f64;

if one_day_elapsed {
// Overcharge the rate limiter to consume tokens regardless of availability
self.rate_limiter.overcharge(tokens_needed);
} else {
if !self.rate_limiter.consume(tokens_needed) {
return vec![];
}
}

return messages;
}

vec![]
Expand Down
2 changes: 1 addition & 1 deletion src/domain/notification_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl NotificationMessage {
);

assert!(
follow_change.is_notifiable(),
follow_change.is_follower(),
"Only followed changes can be messaged"
);

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod http_server;
pub mod metrics;
pub mod migrations;
pub mod publisher;
pub mod rate_limiter;
pub mod relay_subscriber;
pub mod repo;
pub mod scheduler;
Expand Down
178 changes: 178 additions & 0 deletions src/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use tokio::time::{Duration, Instant};

/// Token bucket rate limiter.
pub struct RateLimiter {
burst_capacity: f64, // Maximum number of tokens
tokens: f64, // Current number of tokens (can be negative)
refill_rate_per_sec: f64, // Tokens added per second
last_refill: Instant, // Last time tokens were refilled
max_negative_tokens: f64, // Maximum allowed negative tokens (deficit)
}

impl RateLimiter {
/// Creates a new RateLimiter.
pub fn new(burst_capacity: f64, refill_duration: Duration) -> Self {
let refill_rate_per_sec = burst_capacity / refill_duration.as_secs_f64();
let tokens = burst_capacity;
let max_negative_tokens = burst_capacity * 1000.0;

Self {
burst_capacity,
tokens,
refill_rate_per_sec,
last_refill: Instant::now(),
max_negative_tokens,
}
}

/// Refills tokens based on the elapsed time since the last refill.
fn refill_tokens(&mut self) {
let now = Instant::now();
let elapsed = now.duration_since(self.last_refill).as_secs_f64();
self.last_refill = now;

let tokens_to_add = elapsed * self.refill_rate_per_sec;
self.tokens = (self.tokens + tokens_to_add).min(self.burst_capacity);
}

/// Checks if the specified number of tokens are available without consuming them.
pub fn can_consume(&mut self, tokens_needed: f64) -> bool {
self.refill_tokens();
self.tokens >= tokens_needed
}

/// Attempts to consume the specified number of tokens.
pub fn consume(&mut self, tokens_needed: f64) -> bool {
self.refill_tokens();
if self.tokens >= tokens_needed {
self.tokens -= tokens_needed;
true
} else {
false
}
}

/// Consumes tokens regardless of availability (can result in negative token count).
pub fn overcharge(&mut self, tokens_needed: f64) {
self.refill_tokens();
self.tokens -= tokens_needed;

if self.tokens < -self.max_negative_tokens {
self.tokens = -self.max_negative_tokens;
}
}

pub fn get_available_tokens(&mut self) -> f64 {
self.refill_tokens();
self.tokens
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{self, advance};
use tokio::time::{Duration, Instant};

#[tokio::test]
async fn test_initial_tokens() {
let capacity = 10.0;
let refill_duration = Duration::from_secs(86400); // 1 day
let rate_limiter = RateLimiter::new(capacity, refill_duration);

assert_eq!(rate_limiter.tokens, capacity);
}

#[tokio::test]
async fn test_consume_tokens_success() {
let capacity = 10.0;
let refill_duration = Duration::from_secs(86400); // 1 day
let mut rate_limiter = RateLimiter::new(capacity, refill_duration);

// Consume 5 tokens
let result = rate_limiter.consume(5.0);
assert!(result);
assert_eq!(rate_limiter.tokens, 5.0);
}

#[tokio::test]
async fn test_consume_tokens_failure() {
let capacity = 10.0;
let refill_duration = Duration::from_secs(86400); // 1 day
let mut rate_limiter = RateLimiter::new(capacity, refill_duration);

// Attempt to consume more tokens than available
let result = rate_limiter.consume(15.0);
assert!(!result);
// Tokens should remain unchanged since consume failed
assert_eq!(rate_limiter.tokens, capacity);
}

#[tokio::test]
async fn test_overcharge() {
let capacity = 10.0;
let refill_duration = Duration::from_secs(86400); // 1 day
let max_negative_tokens = capacity * 1000.0;
let mut rate_limiter = RateLimiter::new(capacity, refill_duration);

// Overcharge by 15 tokens
rate_limiter.overcharge(15.0);
assert_eq!(rate_limiter.tokens, -5.0);

// Overcharge beyond max_negative_tokens
rate_limiter.overcharge(max_negative_tokens * 2.0);
assert_eq!(rate_limiter.tokens, -max_negative_tokens);
}

#[tokio::test(start_paused = true)]
async fn test_refill_tokens() {
let capacity = 10.0;
let refill_duration = Duration::from_secs(10); // Short duration for testing
let mut rate_limiter = RateLimiter::new(capacity, refill_duration);

// Consume all tokens
let result = rate_limiter.consume(capacity);
assert!(result);
assert_eq!(rate_limiter.tokens, 0.0);

// Advance time by half of the refill duration
time::advance(Duration::from_secs(5)).await;
rate_limiter.refill_tokens();
// Should have refilled half the tokens
assert_eq!(rate_limiter.tokens, 5.0);

// Advance time to complete the refill duration
time::advance(Duration::from_secs(5)).await;
rate_limiter.refill_tokens();
assert_eq!(rate_limiter.tokens, capacity);
}

#[tokio::test(start_paused = true)]
async fn test_overcharge_and_refill() {
let capacity = 10.0;
let refill_duration = Duration::from_secs(10); // Short duration for testing
let mut rate_limiter = RateLimiter::new(capacity, refill_duration);

// Overcharge by 15 tokens
rate_limiter.overcharge(15.0);
assert_eq!(rate_limiter.tokens, -5.0);

// Advance time to refill tokens
time::advance(Duration::from_secs(20)).await; // Wait enough to refill capacity
rate_limiter.refill_tokens();
// Tokens should be at capacity, but deficit should be reduced
assert_eq!(rate_limiter.tokens, capacity);
}

#[tokio::test]
async fn test_max_negative_tokens() {
let capacity = 10.0;
let refill_duration = Duration::from_secs(86400); // 1 day
let max_negative_tokens = capacity * 1000.0;
let mut rate_limiter = RateLimiter::new(capacity, refill_duration);

// Overcharge repeatedly to exceed max_negative_tokens
rate_limiter.overcharge(max_negative_tokens + 50.0);
assert_eq!(rate_limiter.tokens, -max_negative_tokens);
}
}

0 comments on commit e59e4b2

Please sign in to comment.