Skip to content

Make the job runner a bit more resilient to slow jobs or other errors #1804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/background_jobs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::panic::AssertUnwindSafe;
use std::sync::{Mutex, MutexGuard, PoisonError};
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
use swirl::PerformError;

use crate::db::{DieselPool, DieselPooledConn};
Expand All @@ -21,14 +21,28 @@ impl swirl::db::DieselPool for DieselPool {

#[allow(missing_debug_implementations)]
pub struct Environment {
index: Mutex<Repository>,
index: Arc<Mutex<Repository>>,
pub credentials: Option<(String, String)>,
// FIXME: https://github.com/sfackler/r2d2/pull/70
pub connection_pool: AssertUnwindSafe<DieselPool>,
pub uploader: Uploader,
http_client: AssertUnwindSafe<reqwest::Client>,
}

// FIXME: AssertUnwindSafe should be `Clone`, this can be replaced with
// `#[derive(Clone)]` if that is fixed in the standard lib
impl Clone for Environment {
fn clone(&self) -> Self {
Self {
index: self.index.clone(),
credentials: self.credentials.clone(),
connection_pool: AssertUnwindSafe(self.connection_pool.0.clone()),
uploader: self.uploader.clone(),
http_client: AssertUnwindSafe(self.http_client.0.clone()),
}
}
}

impl Environment {
pub fn new(
index: Repository,
Expand All @@ -38,7 +52,7 @@ impl Environment {
http_client: reqwest::Client,
) -> Self {
Self {
index: Mutex::new(index),
index: Arc::new(Mutex::new(index)),
credentials,
connection_pool: AssertUnwindSafe(connection_pool),
uploader,
Expand Down
61 changes: 44 additions & 17 deletions src/bin/background-worker.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// Runs enqueued background jobs
//
// This binary will loop until interrupted. Every second, it will attempt to
// run any jobs in the background queue. Panics if attempting to count
// available jobs fails.
//
// Usage:
// cargo run --bin background-worker
//! Runs enqueued background jobs
//!
//! This binary will loop until interrupted. It will run all jobs in the
//! background queue, sleeping for 1 second whenever the queue is empty. If we
//! are unable to spawn workers to run jobs (either because we couldn't connect
//! to the DB, an error occurred while loading, or we just never heard back from
//! the worker thread), we will rebuild the runner and try again up to 5 times.
//! After the 5th occurrance, we will panic.
//!
//! Usage:
//! cargo run --bin background-worker

#![deny(warnings, clippy::all, rust_2018_idioms)]

Expand All @@ -20,8 +23,13 @@ fn main() {

let config = cargo_registry::Config::default();

// We're only using 1 thread, so we only need 2 connections
let db_config = r2d2::Pool::builder().max_size(2);
// 2x the thread pool size -- not all our jobs need a DB connection,
// but we want to always be able to run our jobs in parallel, rather
// than adjusting based on how many concurrent jobs need a connection.
// Eventually swirl will do this for us, and this will be the default
// -- we should just let it do a thread pool size of CPU count, and a
// a connection pool size of 2x that when that lands.
let db_config = r2d2::Pool::builder().max_size(4);
let db_pool = db::diesel_pool(&config.db_url, config.env, db_config);

let username = dotenv::var("GIT_HTTP_USER");
Expand All @@ -31,6 +39,11 @@ fn main() {
_ => None,
};

let job_start_timeout = dotenv::var("BACKGROUND_JOB_TIMEOUT")
.unwrap_or_else(|_| "30".into())
.parse()
.expect("Invalid value for `BACKGROUND_JOB_TIMEOUT`");

println!("Cloning index");

let repository = Repository::open(&config.index_location).expect("Failed to clone index");
Expand All @@ -43,17 +56,31 @@ fn main() {
reqwest::Client::new(),
);

let runner = swirl::Runner::builder(db_pool, environment)
.thread_count(1)
.job_start_timeout(Duration::from_secs(10))
.build();
let build_runner = || {
swirl::Runner::builder(db_pool.clone(), environment.clone())
.thread_count(2)
.job_start_timeout(Duration::from_secs(job_start_timeout))
.build()
};
let mut runner = build_runner();

println!("Runner booted, running jobs");

let mut failure_count = 0;

loop {
runner
.run_all_pending_jobs()
.expect("Could not begin running jobs");
if let Err(e) = runner.run_all_pending_jobs() {
failure_count += 1;
if failure_count < 5 {
eprintln!(
"Error running jobs (n = {}) -- retrying: {:?}",
failure_count, e,
);
runner = build_runner();
} else {
panic!("Failed to begin running jobs 5 times. Restarting the process");
}
}
sleep(Duration::from_secs(1));
}
}