Skip to content

Commit

Permalink
Limit the number of ping upload requests per minute
Browse files Browse the repository at this point in the history
  • Loading branch information
brizental committed Jun 12, 2020
1 parent bdeeb35 commit c65349a
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 18 deletions.
2 changes: 1 addition & 1 deletion glean-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ impl Glean {
/// # Return value
///
/// `PingUploadTask` - an enum representing the possible tasks.
pub fn get_upload_task(&self, log_ping: bool) -> PingUploadTask {
pub fn get_upload_task(&mut self, log_ping: bool) -> PingUploadTask {
self.upload_manager.get_upload_task(log_ping)
}

Expand Down
162 changes: 145 additions & 17 deletions glean-core/src/upload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
// This is here just to not have lint error for now.
#![allow(dead_code)]

use std::cmp::max;
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
use std::thread;
use std::time::SystemTime;

use once_cell::sync::OnceCell;
use serde_json::Value as JsonValue;
Expand All @@ -32,6 +34,12 @@ mod directory;
mod request;
mod result;

static SECONDS_IN_A_MINUTE: u32 = 60;

// To keep resource usage in check,
// we limit ping sending to a maximum number of pings per minute.
static MAX_PING_SENDS_PER_MINUTE: u32 = 10;

/// A global Glean upload manager instance.
///
/// This is only used by processes who exclusively need to manage
Expand Down Expand Up @@ -69,6 +77,74 @@ pub fn setup_upload_manager(upload_manager: PingUploadManager) -> Result<()> {
Ok(())
}

/// A counter to help limit the number of processing pings per minute.
///
/// NOTE: A "processing" ping is a ping that has been dequeued through `get_upload_task`.
#[derive(Debug)]
struct PingCounter {
started: Option<SystemTime>,
count: u32,
}

impl PingCounter {
pub fn new() -> Self {
Self {
started: None,
count: 0,
}
}

fn reset(&mut self) {
self.started = Some(SystemTime::now());
self.count = 0;
}

/// The counter should reset if
///
/// 1. It has never started;
/// 2. It has been started more than one minute ago;
/// 3. Something goes wrong while trying to calculate the elapsed time since the last reset.
fn should_reset(&self) -> bool {
if self.started.is_none() {
return true;
}

match self.started.unwrap().elapsed() {
Ok(elapsed) => {
let remaining = max(0, SECONDS_IN_A_MINUTE - elapsed.as_secs() as u32);
if remaining == 0 {
return true;
}
}
Err(e) => {
log::warn!(
"Error getting elapse time since start of ping counter: {:?}",
e
);
return true;
}
}

false
}

/// Increments the counter by 1 and returns the current count.
///
/// This will return `None` if the counter has reached the maximum.
pub fn increment(&mut self) -> Option<u32> {
if self.should_reset() {
self.reset()
}

if self.count == MAX_PING_SENDS_PER_MINUTE {
return None;
}

self.count += 1;
Some(self.count)
}
}

/// When asking for the next ping request to upload,
/// the requester may receive one out of three possible tasks.
///
Expand All @@ -94,6 +170,8 @@ pub struct PingUploadManager {
directory_manager: PingDirectoryManager,
/// A flag signaling if we are done processing the pending pings directories.
processed_pending_pings: Arc<AtomicBool>,
/// A ping counter to help rate limit the ping uploads.
counter: PingCounter,
}

impl PingUploadManager {
Expand Down Expand Up @@ -139,6 +217,7 @@ impl PingUploadManager {
queue,
processed_pending_pings,
directory_manager,
counter: PingCounter::new(),
}
}

Expand Down Expand Up @@ -183,7 +262,7 @@ impl PingUploadManager {
/// # Return value
///
/// `PingUploadTask` - see [`PingUploadTask`](enum.PingUploadTask.html) for more information.
pub fn get_upload_task(&self, log_ping: bool) -> PingUploadTask {
pub fn get_upload_task(&mut self, log_ping: bool) -> PingUploadTask {
if !self.has_processed_pings_dir() {
log::info!(
"Tried getting an upload task, but processing is ongoing. Will come back later."
Expand All @@ -195,8 +274,18 @@ impl PingUploadManager {
.queue
.write()
.expect("Can't write to pending pings queue.");
match queue.pop_front() {
match queue.front() {
Some(request) => {
// If unable to increment the counter,
// we have reached the rate limit of ping uploads for the time being.
if self.counter.increment().is_none() {
log::info!(
"Tried getting an upload task, but we are already at maximum processing capacity.
Will come back later."
);
return PingUploadTask::Wait;
}

log::info!(
"New upload task with id {} (path: {})",
request.document_id,
Expand All @@ -211,7 +300,7 @@ impl PingUploadManager {
}
}

PingUploadTask::Upload(request)
PingUploadTask::Upload(queue.pop_front().unwrap())
}
None => {
log::info!("No more pings to upload! You are done.");
Expand Down Expand Up @@ -377,7 +466,7 @@ mod test {
fn test_doesnt_error_when_there_are_no_pending_pings() {
// Create a new upload_manager
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), false);
let mut upload_manager = PingUploadManager::new(dir.path(), false);

// Wait for processing of pending pings directory to finish.
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
Expand All @@ -393,7 +482,7 @@ mod test {
fn test_returns_ping_request_when_there_is_one() {
// Create a new upload_manager
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), false);
let mut upload_manager = PingUploadManager::new(dir.path(), false);

// Wait for processing of pending pings directory to finish.
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
Expand All @@ -415,15 +504,15 @@ mod test {
fn test_returns_as_many_ping_requests_as_there_are() {
// Create a new upload_manager
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), false);
let mut upload_manager = PingUploadManager::new(dir.path(), false);

// Wait for processing of pending pings directory to finish.
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}

// Enqueue a ping multiple times
let n = 10;
let n = MAX_PING_SENDS_PER_MINUTE;
for _ in 0..n {
upload_manager.enqueue_ping(DOCUMENT_ID, PATH, json!({}));
}
Expand All @@ -440,11 +529,50 @@ mod test {
assert_eq!(upload_manager.get_upload_task(false), PingUploadTask::Done);
}

#[test]
fn test_limits_the_number_of_pings_per_minute() {
// Create a new upload_manager
let dir = tempfile::tempdir().unwrap();
let mut upload_manager = PingUploadManager::new(dir.path(), false);

// Wait for processing of pending pings directory to finish.
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}

// Enqueue a ping multiple times
let n = MAX_PING_SENDS_PER_MINUTE;
for _ in 0..n {
upload_manager.enqueue_ping(DOCUMENT_ID, PATH, json!({}));
}

// Verify a request is returned for each submitted ping
for _ in 0..n {
match upload_manager.get_upload_task(false) {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
}

// Enqueue just one more ping...
upload_manager.enqueue_ping(DOCUMENT_ID, PATH, json!({}));

// Verify that we are indeed told to wait because we are at capacity
assert_eq!(PingUploadTask::Wait, upload_manager.get_upload_task(false));

thread::sleep(Duration::from_millis((1000 * SECONDS_IN_A_MINUTE) as u64));

match upload_manager.get_upload_task(false) {
PingUploadTask::Upload(_) => {}
_ => panic!("Expected upload manager to return the next request!"),
}
}

#[test]
fn test_clearing_the_queue_works_correctly() {
// Create a new upload_manager
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), false);
let mut upload_manager = PingUploadManager::new(dir.path(), false);

// Wait for processing of pending pings directory to finish.
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
Expand Down Expand Up @@ -472,7 +600,7 @@ mod test {
glean.register_ping_type(&ping_type);

// Submit the ping multiple times
let n = 10;
let n = MAX_PING_SENDS_PER_MINUTE;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}
Expand Down Expand Up @@ -505,13 +633,13 @@ mod test {
glean.register_ping_type(&ping_type);

// Submit the ping multiple times
let n = 10;
let n = MAX_PING_SENDS_PER_MINUTE;
for _ in 0..n {
glean.submit_ping(&ping_type, None).unwrap();
}

// Create a new upload_manager
let upload_manager = PingUploadManager::new(dir.path(), false);
let mut upload_manager = PingUploadManager::new(dir.path(), false);

// Wait for processing of pending pings directory to finish.
let mut upload_task = upload_manager.get_upload_task(false);
Expand Down Expand Up @@ -546,7 +674,7 @@ mod test {
glean.submit_ping(&ping_type, None).unwrap();

// Create a new upload_manager
let upload_manager = PingUploadManager::new(&dir.path(), false);
let mut upload_manager = PingUploadManager::new(&dir.path(), false);

// Wait for processing of pending pings directory to finish.
let mut upload_task = upload_manager.get_upload_task(false);
Expand Down Expand Up @@ -586,7 +714,7 @@ mod test {
glean.submit_ping(&ping_type, None).unwrap();

// Create a new upload_manager
let upload_manager = PingUploadManager::new(&dir.path(), false);
let mut upload_manager = PingUploadManager::new(&dir.path(), false);

// Wait for processing of pending pings directory to finish.
let mut upload_task = upload_manager.get_upload_task(false);
Expand Down Expand Up @@ -626,7 +754,7 @@ mod test {
glean.submit_ping(&ping_type, None).unwrap();

// Create a new upload_manager
let upload_manager = PingUploadManager::new(dir.path(), false);
let mut upload_manager = PingUploadManager::new(dir.path(), false);

// Wait for processing of pending pings directory to finish.
let mut upload_task = upload_manager.get_upload_task(false);
Expand Down Expand Up @@ -668,7 +796,7 @@ mod test {
glean.submit_ping(&ping_type, None).unwrap();

// Create a new upload_manager
let upload_manager = PingUploadManager::new(&dir.path(), false);
let mut upload_manager = PingUploadManager::new(&dir.path(), false);

// Wait for processing of pending pings directory to finish.
let mut upload_task = upload_manager.get_upload_task(false);
Expand Down Expand Up @@ -700,7 +828,7 @@ mod test {
fn new_pings_are_added_while_upload_in_progress() {
// Create a new upload_manager
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), false);
let mut upload_manager = PingUploadManager::new(dir.path(), false);

// Wait for processing of pending pings directory to finish.
while upload_manager.get_upload_task(false) == PingUploadTask::Wait {
Expand Down Expand Up @@ -750,7 +878,7 @@ mod test {
fn test_uploader_sync_init() {
// Create a new upload_manager, with a synchronous ping dir scan.
let dir = tempfile::tempdir().unwrap();
let upload_manager = PingUploadManager::new(dir.path(), true);
let mut upload_manager = PingUploadManager::new(dir.path(), true);

// Since the scan was synchronous and the directory was empty,
// we expect the upload task to always be `Done`.
Expand Down

0 comments on commit c65349a

Please sign in to comment.