Skip to content

Commit

Permalink
Move our index updates to be run in background jobs
Browse files Browse the repository at this point in the history
This fundamentally changes the workflow for all operations we perform
involving git, so that they are not performed on the web server and do
not block the response. This will improve the response times of `cargo
publish`, and make the publish process more resilient, reducing the
liklihood of an inconsistency occurring such as the index getting
updated, but not our database.

Previously, our workflow looked something like this:

- When the server boots, do a full clone of the index into a known
  location
- Some request comes in that needs to update the index
- Database transaction is opened
- Local checkout is modified, we attempt to commit & push (note: This
  involves a mutex to avoid contention with another request to update
  the index on the same server)
- If push fails, we fetch, `reset --hard`, and try again up to 20 times
- Database transaction is committed
- We send a successful response

The reason for the retry logic is that we have more than one web server,
meaning no server can be sure that its local checkout is actually up to
date. There's also a major opportunity for an inconsistent state to be
reached here. If the power goes out, the server is restarted, something
crashes, etc, in between the index being updated and the database
transaction being committed, we will never retry it.

The new workflow looks like this:

- Some request comes in that needs to update the index
- A job is queued in the database to update the index at some point in
  the future.
- We send a successful response
- A separate process pulls the job out of the database
- A full clone of the index is performed into a temporary directory
- The new checkout is modified, committed, and pushed
- If push succeeds, job is removed from database
- If push fails, job is marked as failed and will be retried at some
  point in the future

While a background worker can be spread across multiple machines and/or
threads, we will be able to avoid the race conditions that were
previously possible by ensuring that we only have one worker with one
thread that handles index updates. Right now that's easy since index
updates are the only background job we have, but as we add more we will
need to add support for multiple queues to account for this.

I've opted to do a fresh checkout in every job, rather than relying on
some state that was setup when the machine booted. This is mostly for
simplicity's sake. It also means that if we need to scale to multiple
threads/processes for other jobs, we can punt the multi-queue
enhancement for a while if we wish. However, it does mean the job will
take a bit longer to run. If this turns out to be a problem, it's easy
to address.

This should eliminate the opportunity for the index to enter an
inconsistent state from our database -- or at least they should become
eventually consistent. If the power goes out before the job is committed
as done, it is assumed the job failed and it will be retried. The job
itself is idempotent, so even if the power goes out after the index is
updated, the retry should succeed.

One other side effect of this change is that when `cargo publish`
returns with an exit status of 0, that does not mean that your crate/new
version is immediately available for use -- if you try to point to it in
Cargo.toml seconds after publishing, you may get an error that it could
not find that version. This was technically already true, since neither
S3 nor GitHub guarantee that uploads/pushes are immediately visible.
However, this does increase the timescale beyond the delay we would have
seen there. In most cases it should be under 10 seconds, and at most a
minute.

One enhancement that will come as a followup, but is not included in
this PR is a UI to see the status of your upload. This is definitely
nice to have, but is not something I think is necessary for this feature
to land. The time it would take to navigate to that UI is going to be
longer than the time it takes the background job to run in most cases.
That enhancement is something I think can go hand in hand with rust-lang#1503
(which incidentally becomes much easier to implement with this PR, since
a "staging" publish just skips queuing the background job, and the only
thing the button to full publish needs to do is queue the job).

This setup does assume that all background jobs *must* eventually
succeed. If any job fails, the index is in an inconsistent state with
our database, and we are having an outage of some kind. Due to the
nature of our background jobs, this likely means that GitHub is down, or
there is a bug in our code. Either way, we page whoever is on-call,
since it means publishing is broken. Since publishing crates is such an
infrequent event, I've set the thresholds to be extremely low.
  • Loading branch information
sgrif committed Mar 8, 2019
1 parent 1e234dd commit 6991e90
Show file tree
Hide file tree
Showing 15 changed files with 312 additions and 224 deletions.
9 changes: 2 additions & 7 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{db, util::CargoResult, Config, Env};
use std::{
env,
path::PathBuf,
sync::{Arc, Mutex},
sync::Arc,
time::Duration,
};

Expand All @@ -25,10 +25,8 @@ pub struct App {
/// A unique key used with conduit_cookie to generate cookies
pub session_key: String,

/// The crate index git repository
pub git_repo: Mutex<git2::Repository>,

/// The location on disk of the checkout of the crate index git repository
/// Only used in the development environment.
pub git_repo_checkout: PathBuf,

/// The server configuration
Expand Down Expand Up @@ -86,13 +84,10 @@ impl App {
.connection_customizer(Box::new(db::SetStatementTimeout(db_connection_timeout)))
.thread_pool(thread_pool);

let repo = git2::Repository::open(&config.git_repo_checkout).unwrap();

App {
diesel_database: db::diesel_pool(&config.db_url, config.env, diesel_db_config),
github,
session_key: config.session_key.clone(),
git_repo: Mutex::new(repo),
git_repo_checkout: config.git_repo_checkout.clone(),
config: config.clone(),
}
Expand Down
2 changes: 1 addition & 1 deletion src/background/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use diesel::PgConnection;
use serde::{Serialize, de::DeserializeOwned};

use super::storage;
use util::CargoResult;
use crate::util::CargoResult;

/// A background job, meant to be run asynchronously.
pub trait Job: Serialize + DeserializeOwned {
Expand Down
5 changes: 3 additions & 2 deletions src/background/registry.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use serde_json;
use std::collections::HashMap;
use std::panic::RefUnwindSafe;

use super::Job;
use util::CargoResult;
use crate::util::CargoResult;

#[doc(hidden)]
pub type PerformFn<Env> = Box<dyn Fn(serde_json::Value, &Env) -> CargoResult<()> + Send + Sync>;
pub type PerformFn<Env> = Box<dyn Fn(serde_json::Value, &Env) -> CargoResult<()> + RefUnwindSafe + Send + Sync>;

#[derive(Default)]
#[allow(missing_debug_implementations)] // Can't derive debug
Expand Down
76 changes: 61 additions & 15 deletions src/background/runner.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#![allow(dead_code)]
use diesel::prelude::*;
use std::panic::{catch_unwind, UnwindSafe};
use std::any::Any;
use std::panic::{catch_unwind, UnwindSafe, RefUnwindSafe, PanicInfo};
use std::sync::Arc;
use threadpool::ThreadPool;

use db::{DieselPool, DieselPooledConn};
use crate::db::{DieselPool, DieselPooledConn};
use super::{storage, Registry, Job};
use util::errors::*;
use crate::util::errors::*;

#[allow(missing_debug_implementations)]
pub struct Builder<Env> {
Expand All @@ -30,8 +32,8 @@ impl<Env> Builder<Env> {
Runner {
connection_pool: self.connection_pool,
thread_pool: ThreadPool::new(self.thread_count.unwrap_or(5)),
environment: self.environment,
registry: self.registry,
environment: Arc::new(self.environment),
registry: Arc::new(self.registry),
}
}
}
Expand All @@ -40,11 +42,11 @@ impl<Env> Builder<Env> {
pub struct Runner<Env> {
connection_pool: DieselPool,
thread_pool: ThreadPool,
environment: Env,
registry: Registry<Env>,
environment: Arc<Env>,
registry: Arc<Registry<Env>>,
}

impl<Env> Runner<Env> {
impl<Env: RefUnwindSafe + Send + Sync + 'static> Runner<Env> {
pub fn builder(connection_pool: DieselPool, environment: Env) -> Builder<Env> {
Builder {
connection_pool,
Expand All @@ -54,6 +56,24 @@ impl<Env> Runner<Env> {
}
}

pub fn run_all_pending_jobs(&self) -> CargoResult<()> {
let available_job_count = storage::available_job_count(&*self.connection()?)?;
for _ in 0..available_job_count {
self.run_single_job()
}
Ok(())
}

fn run_single_job(&self) {
let environment = Arc::clone(&self.environment);
let registry = Arc::clone(&self.registry);
self.get_single_job(move |job| {
let perform_fn = registry.get(&job.job_type)
.ok_or_else(|| internal(&format_args!("Unknown job type {}", job.job_type)))?;
perform_fn(job.data, &environment)
})
}

fn get_single_job<F>(&self, f: F)
where
F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + Send + UnwindSafe + 'static,
Expand All @@ -69,13 +89,15 @@ impl<Env> Runner<Env> {
let job_id = job.id;

let result = catch_unwind(|| f(job))
.map_err(|_| internal("job panicked"))
.map_err(try_to_extract_panic_info)
.and_then(|r| r);

if result.is_ok() {
storage::delete_successful_job(&conn, job_id)?;
} else {
storage::update_failed_job(&conn, job_id);
match result {
Ok(_) => storage::delete_successful_job(&conn, job_id)?,
Err(e) => {
eprintln!("Job {} failed to run: {}", job_id, e);
storage::update_failed_job(&conn, job_id);
}
}
Ok(())
}).expect("Could not retrieve or update job")
Expand All @@ -86,19 +108,43 @@ impl<Env> Runner<Env> {
self.connection_pool.get().map_err(Into::into)
}

#[cfg(test)]
pub fn assert_no_failed_jobs(&self) -> CargoResult<()> {
self.wait_for_jobs();
let failed_jobs = storage::failed_job_count(&*self.connection()?)?;
assert_eq!(0, failed_jobs);
Ok(())
}

fn wait_for_jobs(&self) {
self.thread_pool.join();
assert_eq!(0, self.thread_pool.panic_count());
}
}

/// Try to figure out what's in the box, and print it if we can.
///
/// The actual error type we will get from `panic::catch_unwind` is really poorly documented.
/// However, the `panic::set_hook` functions deal with a `PanicInfo` type, and its payload is
/// documented as "commonly but not always `&'static str` or `String`". So we can try all of those,
/// and give up if we didn't get one of those three types.
fn try_to_extract_panic_info(info: Box<dyn Any + Send + 'static>) -> Box<dyn CargoError> {
if let Some(x) = info.downcast_ref::<PanicInfo>() {
internal(&format_args!("job panicked: {}", x))
} else if let Some(x) = info.downcast_ref::<&'static str>() {
internal(&format_args!("job panicked: {}", x))
} else if let Some(x) = info.downcast_ref::<String>() {
internal(&format_args!("job panicked: {}", x))
} else {
internal("job panicked")
}
}

#[cfg(test)]
mod tests {
use diesel::prelude::*;
use diesel::r2d2;

use schema::background_jobs::dsl::*;
use crate::schema::background_jobs::dsl::*;
use std::sync::{Mutex, MutexGuard, Barrier, Arc};
use std::panic::AssertUnwindSafe;
use super::*;
Expand Down
50 changes: 38 additions & 12 deletions src/background/storage.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use diesel::dsl::now;
use diesel::pg::Pg;
use diesel::prelude::*;
use diesel::{delete, insert_into, update};
use diesel::sql_types::Integer;
use diesel::sql_types::{Bool, Integer, Interval};
use serde_json;

use schema::background_jobs;
use crate::schema::background_jobs;
use super::Job;
use util::CargoResult;
use crate::util::CargoResult;

#[derive(Queryable, Identifiable, Debug, Clone)]
pub struct BackgroundJob {
Expand All @@ -17,7 +18,7 @@ pub struct BackgroundJob {

/// Enqueues a job to be run as soon as possible.
pub fn enqueue_job<T: Job>(conn: &PgConnection, job: T) -> CargoResult<()> {
use schema::background_jobs::dsl::*;
use crate::schema::background_jobs::dsl::*;

let job_data = serde_json::to_value(job)?;
insert_into(background_jobs)
Expand All @@ -29,27 +30,52 @@ pub fn enqueue_job<T: Job>(conn: &PgConnection, job: T) -> CargoResult<()> {
Ok(())
}

/// Finds the next job that is unlocked, and ready to be retried. If a row is
/// found, it will be locked.
pub fn find_next_unlocked_job(conn: &PgConnection) -> QueryResult<BackgroundJob> {
use schema::background_jobs::dsl::*;
fn retriable() -> Box<dyn BoxableExpression<background_jobs::table, Pg, SqlType = Bool>> {
use crate::schema::background_jobs::dsl::*;
use diesel::dsl::*;
use diesel::sql_types::Interval;

sql_function!(power, power_t, (x: Integer, y: Integer) -> Integer);

Box::new(last_retry.lt(now - 1.minute().into_sql::<Interval>() * power(2, retries)))
}

/// Finds the next job that is unlocked, and ready to be retried. If a row is
/// found, it will be locked.
pub fn find_next_unlocked_job(conn: &PgConnection) -> QueryResult<BackgroundJob> {
use crate::schema::background_jobs::dsl::*;

background_jobs
.select((id, job_type, data))
.filter(last_retry.lt(now - 1.minute().into_sql::<Interval>() * power(2, retries)))
.filter(retriable())
.order(id)
.for_update()
.skip_locked()
.first::<BackgroundJob>(conn)
}

/// The number of jobs available to be run
pub fn failed_job_count(conn: &PgConnection) -> QueryResult<i64> {
use crate::schema::background_jobs::dsl::*;

background_jobs
.count()
.filter(retries.gt(0))
.get_result(conn)
}

/// The number of jobs that have failed at least once
pub fn available_job_count(conn: &PgConnection) -> QueryResult<i64> {
use crate::schema::background_jobs::dsl::*;

background_jobs
.count()
.filter(retriable())
.get_result(conn)
}

/// Deletes a job that has successfully completed running
pub fn delete_successful_job(conn: &PgConnection, job_id: i64) -> QueryResult<()> {
use schema::background_jobs::dsl::*;
use crate::schema::background_jobs::dsl::*;

delete(background_jobs.find(job_id)).execute(conn)?;
Ok(())
Expand All @@ -60,7 +86,7 @@ pub fn delete_successful_job(conn: &PgConnection, job_id: i64) -> QueryResult<()
/// Ignores any database errors that may have occurred. If the DB has gone away,
/// we assume that just trying again with a new connection will succeed.
pub fn update_failed_job(conn: &PgConnection, job_id: i64) {
use schema::background_jobs::dsl::*;
use crate::schema::background_jobs::dsl::*;

let _ = update(background_jobs.find(job_id))
.set((
Expand Down
23 changes: 23 additions & 0 deletions src/background_jobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use url::Url;

use crate::background::{Runner, Builder};
use crate::git::{AddCrate, Yank};

pub fn job_runner(config: Builder<Environment>) -> Runner<Environment> {
config
.register::<AddCrate>()
.register::<Yank>()
.build()
}

#[allow(missing_debug_implementations)]
pub struct Environment {
pub index_location: Url,
pub credentials: Option<(String, String)>,
}

impl Environment {
pub fn credentials(&self) -> Option<(&str, &str)> {
self.credentials.as_ref().map(|(u, p)| (u.as_str(), p.as_str()))
}
}
35 changes: 4 additions & 31 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#![deny(warnings)]

use cargo_registry::{boot, build_handler, env, git, App, Config, Env};
use cargo_registry::{boot, App, Env};
use jemalloc_ctl;
use std::{
env,
fs::{self, File},
fs::File,
sync::{mpsc::channel, Arc},
};

Expand All @@ -23,37 +23,10 @@ fn main() {

// Initialize logging
env_logger::init();
let config = Config::default();

// If there isn't a git checkout containing the crate index repo at the path specified
// by `GIT_REPO_CHECKOUT`, delete that directory and clone the repo specified by `GIT_REPO_URL`
// into that directory instead. Uses the credentials specified in `GIT_HTTP_USER` and
// `GIT_HTTP_PWD` via the `cargo_registry::git::credentials` function.
let url = env("GIT_REPO_URL");
let repo = match git2::Repository::open(&config.git_repo_checkout) {
Ok(r) => r,
Err(..) => {
let _ = fs::remove_dir_all(&config.git_repo_checkout);
fs::create_dir_all(&config.git_repo_checkout).unwrap();
let mut cb = git2::RemoteCallbacks::new();
cb.credentials(git::credentials);
let mut opts = git2::FetchOptions::new();
opts.remote_callbacks(cb);
git2::build::RepoBuilder::new()
.fetch_options(opts)
.clone(&url, &config.git_repo_checkout)
.unwrap()
}
};

// All commits to the index registry made through crates.io will be made by bors, the Rust
// community's friendly GitHub bot.
let mut cfg = repo.config().unwrap();
cfg.set_str("user.name", "bors").unwrap();
cfg.set_str("user.email", "bors@rust-lang.org").unwrap();

let config = cargo_registry::Config::default();
let app = App::new(&config);
let app = build_handler(Arc::new(app));
let app = cargo_registry::build_handler(Arc::new(app));

// On every server restart, ensure the categories available in the database match
// the information in *src/categories.toml*.
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::{env, uploaders::Uploader, Env, Replica};
use std::{env, path::PathBuf};
use url::Url;

#[derive(Clone, Debug)]
pub struct Config {
pub uploader: Uploader,
pub session_key: String,
pub git_repo_checkout: PathBuf,
pub index_location: Url,
pub gh_client_id: String,
pub gh_client_secret: String,
pub db_url: String,
Expand Down Expand Up @@ -124,6 +126,7 @@ impl Default for Config {
uploader,
session_key: env("SESSION_KEY"),
git_repo_checkout: checkout,
index_location: Url::parse(&env("GIT_REPO_URL")).unwrap(),
gh_client_id: env("GH_CLIENT_ID"),
gh_client_secret: env("GH_CLIENT_SECRET"),
db_url: env("DATABASE_URL"),
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/krate/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub fn publish(req: &mut dyn Request) -> CargoResult<Response> {
yanked: Some(false),
links,
};
git::add_crate(&**req.app(), &git_crate).chain_error(|| {
git::add_crate(&conn, git_crate).chain_error(|| {
internal(&format_args!(
"could not add crate `{}` to the git repo",
name
Expand Down
Loading

0 comments on commit 6991e90

Please sign in to comment.