diff --git a/src/background_jobs.rs b/src/background_jobs.rs index 5f011150824..deef5340a11 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -1,5 +1,5 @@ use std::panic::AssertUnwindSafe; -use std::sync::{Mutex, MutexGuard, PoisonError}; +use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; use swirl::PerformError; use crate::db::{DieselPool, DieselPooledConn}; @@ -21,7 +21,7 @@ impl swirl::db::DieselPool for DieselPool { #[allow(missing_debug_implementations)] pub struct Environment { - index: Mutex, + index: Arc>, pub credentials: Option<(String, String)>, // FIXME: https://github.com/sfackler/r2d2/pull/70 pub connection_pool: AssertUnwindSafe, @@ -29,6 +29,20 @@ pub struct Environment { http_client: AssertUnwindSafe, } +// FIXME: AssertUnwindSafe should be `Clone`, this can be replaced with +// `#[derive(Clone)]` if that is fixed in the standard lib +impl Clone for Environment { + fn clone(&self) -> Self { + Self { + index: self.index.clone(), + credentials: self.credentials.clone(), + connection_pool: AssertUnwindSafe(self.connection_pool.0.clone()), + uploader: self.uploader.clone(), + http_client: AssertUnwindSafe(self.http_client.0.clone()), + } + } +} + impl Environment { pub fn new( index: Repository, @@ -38,7 +52,7 @@ impl Environment { http_client: reqwest::Client, ) -> Self { Self { - index: Mutex::new(index), + index: Arc::new(Mutex::new(index)), credentials, connection_pool: AssertUnwindSafe(connection_pool), uploader, diff --git a/src/bin/background-worker.rs b/src/bin/background-worker.rs index 5caf09d5644..a1350263845 100644 --- a/src/bin/background-worker.rs +++ b/src/bin/background-worker.rs @@ -1,11 +1,14 @@ -// Runs enqueued background jobs -// -// This binary will loop until interrupted. Every second, it will attempt to -// run any jobs in the background queue. Panics if attempting to count -// available jobs fails. -// -// Usage: -// cargo run --bin background-worker +//! Runs enqueued background jobs +//! +//! This binary will loop until interrupted. It will run all jobs in the +//! background queue, sleeping for 1 second whenever the queue is empty. If we +//! are unable to spawn workers to run jobs (either because we couldn't connect +//! to the DB, an error occurred while loading, or we just never heard back from +//! the worker thread), we will rebuild the runner and try again up to 5 times. +//! After the 5th occurrance, we will panic. +//! +//! Usage: +//! cargo run --bin background-worker #![deny(warnings, clippy::all, rust_2018_idioms)] @@ -20,8 +23,13 @@ fn main() { let config = cargo_registry::Config::default(); - // We're only using 1 thread, so we only need 2 connections - let db_config = r2d2::Pool::builder().max_size(2); + // 2x the thread pool size -- not all our jobs need a DB connection, + // but we want to always be able to run our jobs in parallel, rather + // than adjusting based on how many concurrent jobs need a connection. + // Eventually swirl will do this for us, and this will be the default + // -- we should just let it do a thread pool size of CPU count, and a + // a connection pool size of 2x that when that lands. + let db_config = r2d2::Pool::builder().max_size(4); let db_pool = db::diesel_pool(&config.db_url, config.env, db_config); let username = dotenv::var("GIT_HTTP_USER"); @@ -31,6 +39,11 @@ fn main() { _ => None, }; + let job_start_timeout = dotenv::var("BACKGROUND_JOB_TIMEOUT") + .unwrap_or_else(|_| "30".into()) + .parse() + .expect("Invalid value for `BACKGROUND_JOB_TIMEOUT`"); + println!("Cloning index"); let repository = Repository::open(&config.index_location).expect("Failed to clone index"); @@ -43,17 +56,31 @@ fn main() { reqwest::Client::new(), ); - let runner = swirl::Runner::builder(db_pool, environment) - .thread_count(1) - .job_start_timeout(Duration::from_secs(10)) - .build(); + let build_runner = || { + swirl::Runner::builder(db_pool.clone(), environment.clone()) + .thread_count(2) + .job_start_timeout(Duration::from_secs(job_start_timeout)) + .build() + }; + let mut runner = build_runner(); println!("Runner booted, running jobs"); + let mut failure_count = 0; + loop { - runner - .run_all_pending_jobs() - .expect("Could not begin running jobs"); + if let Err(e) = runner.run_all_pending_jobs() { + failure_count += 1; + if failure_count < 5 { + eprintln!( + "Error running jobs (n = {}) -- retrying: {:?}", + failure_count, e, + ); + runner = build_runner(); + } else { + panic!("Failed to begin running jobs 5 times. Restarting the process"); + } + } sleep(Duration::from_secs(1)); } }