Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Update Async Dependencies 2" #814

Merged
merged 1 commit into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
841 changes: 568 additions & 273 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 6 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ log = "0.4"
regex = "1"
structopt = "0.3"
crates-index-diff = "7"
reqwest = { version = "0.10", features = ["blocking"] }
reqwest = "0.9"
semver = "0.9"
slug = "=0.1.1"
env_logger = "0.7"
Expand All @@ -27,6 +27,11 @@ toml = "0.5"
html5ever = "0.22"
schemamama = "0.3"
schemamama_postgres = "0.2"
rusoto_s3 = "0.40"
rusoto_core = "0.40"
rusoto_credential = "0.40"
futures = "0.1"
tokio = "0.1"
systemstat = "0.1.4"
prometheus = { version = "0.7.0", default-features = false }
lazy_static = "1.0.0"
Expand Down Expand Up @@ -57,15 +62,6 @@ notify = "4.0.15"
chrono = { version = "0.4.11", features = ["serde"] }
time = "0.1" # TODO: Remove once `iron` is removed

# Communicating with S3
rusoto_s3 = "0.43"
rusoto_core = "0.43"
rusoto_credential = "0.43"

# Async
futures-util = "0.3"
tokio = { version = "0.2", features = ["rt-threaded"] }

[target.'cfg(not(windows))'.dependencies]
libc = "0.2"

Expand Down
53 changes: 24 additions & 29 deletions src/db/delete_crate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::storage::s3::{s3_client, S3Backend, S3_BUCKET_NAME};
use crate::storage::s3::{s3_client, S3_BUCKET_NAME};
use failure::{Error, Fail};
use postgres::Connection;
use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3};
use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3Client, S3};

/// List of directories in docs.rs's underlying storage (either the database or S3) containing a
/// subdirectory named after the crate. Those subdirectories will be deleted.
Expand All @@ -22,9 +22,8 @@ pub fn delete_crate(conn: &Connection, name: &str) -> Result<(), Error> {
};

delete_from_database(conn, name, crate_id)?;
if let Some(client) = s3_client() {
let mut backend = S3Backend::new(client, S3_BUCKET_NAME);
delete_from_s3(&mut backend, name)?
if let Some(s3) = s3_client() {
delete_from_s3(&s3, name)?;
}

Ok(())
Expand Down Expand Up @@ -69,25 +68,24 @@ fn delete_from_database(conn: &Connection, name: &str, crate_id: i32) -> Result<
Ok(())
}

fn delete_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error> {
fn delete_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
for prefix in STORAGE_PATHS_TO_DELETE {
delete_prefix_from_s3(s3, &format!("{}/{}/", prefix, name))?;
}

Ok(())
}

fn delete_prefix_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error> {
fn delete_prefix_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
let mut continuation_token = None;
loop {
let list =
s3.runtime_handle()
.block_on(s3.client().list_objects_v2(ListObjectsV2Request {
bucket: S3_BUCKET_NAME.into(),
prefix: Some(name.into()),
continuation_token,
..ListObjectsV2Request::default()
}))?;
let list = s3
.list_objects_v2(ListObjectsV2Request {
bucket: S3_BUCKET_NAME.into(),
prefix: Some(name.into()),
continuation_token,
..ListObjectsV2Request::default()
})
.sync()?;

let to_delete = list
.contents
Expand All @@ -99,23 +97,20 @@ fn delete_prefix_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error
version_id: None,
})
.collect::<Vec<_>>();

let resp =
s3.runtime_handle()
.block_on(s3.client().delete_objects(DeleteObjectsRequest {
bucket: S3_BUCKET_NAME.into(),
delete: rusoto_s3::Delete {
objects: to_delete,
quiet: None,
},
..DeleteObjectsRequest::default()
}))?;

let resp = s3
.delete_objects(DeleteObjectsRequest {
bucket: S3_BUCKET_NAME.into(),
delete: rusoto_s3::Delete {
objects: to_delete,
quiet: None,
},
..DeleteObjectsRequest::default()
})
.sync()?;
if let Some(errs) = resp.errors {
for err in &errs {
log::error!("error deleting file from s3: {:?}", err);
}

failure::bail!("uploading to s3 failed");
}

Expand Down
2 changes: 1 addition & 1 deletion src/index/api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{error::Result, utils::MetadataPackage};
use chrono::{DateTime, Utc};
use failure::err_msg;
use reqwest::{blocking::Client, header::ACCEPT};
use reqwest::{header::ACCEPT, Client};
use semver::Version;
use serde_json::Value;
use std::io::Read;
Expand Down
21 changes: 6 additions & 15 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn get_file_list<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>, Error> {

pub(crate) enum Storage<'a> {
Database(DatabaseBackend<'a>),
S3(Box<S3Backend<'a>>),
S3(S3Backend<'a>),
}

impl<'a> Storage<'a> {
Expand All @@ -70,17 +70,16 @@ impl<'a> Storage<'a> {
DatabaseBackend::new(conn).into()
}
}

pub(crate) fn get(&mut self, path: &str) -> Result<Blob, Error> {
pub(crate) fn get(&self, path: &str) -> Result<Blob, Error> {
match self {
Self::Database(db) => db.get(path),
Self::S3(s3) => s3.get(path),
}
}

fn store_batch(&mut self, batch: Vec<Blob>, trans: &Transaction) -> Result<(), Error> {
fn store_batch(&mut self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> {
match self {
Self::Database(db) => db.store_batch(&batch, trans),
Self::Database(db) => db.store_batch(batch, trans),
Self::S3(s3) => s3.store_batch(batch),
}
}
Expand Down Expand Up @@ -132,18 +131,15 @@ impl<'a> Storage<'a> {
date_updated: Utc::now(),
})
});

loop {
let batch: Vec<_> = blobs
.by_ref()
.take(MAX_CONCURRENT_UPLOADS)
.collect::<Result<_, Error>>()?;

if batch.is_empty() {
break;
}

self.store_batch(batch, &trans)?;
self.store_batch(&batch, &trans)?;
}

trans.commit()?;
Expand All @@ -156,7 +152,6 @@ fn detect_mime(file_path: &Path) -> Result<&'static str, Error> {
.first_raw()
.map(|m| m)
.unwrap_or("text/plain");

Ok(match mime {
"text/plain" | "text/troff" | "text/x-markdown" | "text/x-rust" | "text/x-toml" => {
match file_path.extension().and_then(OsStr::to_str) {
Expand All @@ -183,7 +178,7 @@ impl<'a> From<DatabaseBackend<'a>> for Storage<'a> {

impl<'a> From<S3Backend<'a>> for Storage<'a> {
fn from(db: S3Backend<'a>) -> Self {
Self::S3(Box::new(db))
Self::S3(db)
}
}

Expand All @@ -205,23 +200,19 @@ mod test {
.prefix("docs.rs-upload-test")
.tempdir()
.unwrap();

for blob in blobs {
let path = dir.path().join(&blob.path);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).unwrap();
}

fs::write(path, &blob.content).expect("failed to write to file");
}

wrapper(|env| {
let db = env.db();
let conn = db.conn();
let mut backend = Storage::Database(DatabaseBackend::new(&conn));
let stored_files = backend.store_all(&conn, "", dir.path()).unwrap();
assert_eq!(stored_files.len(), blobs.len());

for blob in blobs {
let name = Path::new(&blob.path);
assert!(stored_files.contains_key(name));
Expand Down
96 changes: 29 additions & 67 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use super::Blob;
use chrono::{DateTime, NaiveDateTime, Utc};
use failure::Error;
use futures_util::{
future::FutureExt,
stream::{FuturesUnordered, StreamExt},
};
use futures::Future;
use log::{error, warn};
use rusoto_core::region::Region;
use rusoto_credential::DefaultCredentialsProvider;
use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3};
use std::{convert::TryInto, io::Read};
use tokio::runtime::{Handle, Runtime};
use std::convert::TryInto;
use std::io::Read;
use tokio::runtime::Runtime;

#[cfg(test)]
mod test;
Expand All @@ -34,23 +32,15 @@ impl<'a> S3Backend<'a> {
}
}

#[cfg(test)]
pub(crate) fn with_runtime(client: S3Client, bucket: &'a str, runtime: Runtime) -> Self {
Self {
client,
bucket,
runtime,
}
}

pub(super) fn get(&mut self, path: &str) -> Result<Blob, Error> {
pub(super) fn get(&self, path: &str) -> Result<Blob, Error> {
let res = self
.runtime
.block_on(self.client.get_object(GetObjectRequest {
.client
.get_object(GetObjectRequest {
bucket: self.bucket.to_string(),
key: path.into(),
..Default::default()
}))?;
})
.sync()?;

let mut b = res.body.unwrap().into_blocking_read();
let mut content = Vec::with_capacity(
Expand All @@ -70,16 +60,14 @@ impl<'a> S3Backend<'a> {
})
}

pub(super) fn store_batch(&mut self, mut uploads: Vec<Blob>) -> Result<(), Error> {
pub(super) fn store_batch(&mut self, batch: &[Blob]) -> Result<(), Error> {
use futures::stream::FuturesUnordered;
use futures::stream::Stream;
let mut attempts = 0;

loop {
// `FuturesUnordered` is used because the order of execution doesn't
// matter, we just want things to execute as fast as possible
let futures = FuturesUnordered::new();

// Drain uploads, filling `futures` with upload requests
for blob in uploads.drain(..) {
let mut futures = FuturesUnordered::new();
for blob in batch {
futures.push(
self.client
.put_object(PutObjectRequest {
Expand All @@ -89,53 +77,27 @@ impl<'a> S3Backend<'a> {
content_type: Some(blob.mime.clone()),
..Default::default()
})
// Drop the value returned by `put_object` because we don't need it,
// emit an error and replace the error values with the blob that failed
// to upload so that we can retry failed uploads
.map(|resp| match resp {
Ok(..) => {
// Increment the total uploaded files when a file is uploaded
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);

Ok(())
}
Err(err) => {
error!("failed to upload file to s3: {:?}", err);
Err(blob)
}
.inspect(|_| {
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);
}),
);
}
attempts += 1;

// Collect all the failed uploads so that we can retry them
uploads = self.runtime.block_on(
futures
.filter_map(|resp| async move { resp.err() })
.collect(),
);

// If there are no further uploads we were successful and can return
if uploads.is_empty() {
break;

// If more than three attempts to upload fail, return an error
} else if attempts >= 3 {
error!("failed to upload to s3, abandoning");
failure::bail!("Failed to upload to s3 three times, abandoning");
match self.runtime.block_on(futures.map(drop).collect()) {
// this batch was successful, start another batch if there are still more files
Ok(_) => break,
Err(err) => {
error!("failed to upload to s3: {:?}", err);
// if a futures error occurs, retry the batch
if attempts > 2 {
panic!("failed to upload 3 times, exiting");
}
}
}
}

Ok(())
}

pub fn runtime_handle(&self) -> Handle {
self.runtime.handle().clone()
}

pub fn client(&self) -> &S3Client {
&self.client
}
}

fn parse_timespec(mut raw: &str) -> Result<DateTime<Utc>, Error> {
Expand Down Expand Up @@ -180,6 +142,7 @@ pub(crate) mod tests {
use super::*;
use crate::test::*;
use chrono::TimeZone;
use std::slice;

#[test]
fn test_parse_timespec() {
Expand Down Expand Up @@ -209,7 +172,7 @@ pub(crate) mod tests {

// Add a test file to the database
let s3 = env.s3();
s3.upload(vec![blob.clone()]).unwrap();
s3.upload(slice::from_ref(&blob)).unwrap();

// Test that the proper file was returned
s3.assert_blob(&blob, "dir/foo.txt");
Expand Down Expand Up @@ -244,11 +207,10 @@ pub(crate) mod tests {
})
.collect();

s3.upload(blobs.clone()).unwrap();
s3.upload(&blobs).unwrap();
for blob in &blobs {
s3.assert_blob(blob, &blob.path);
}

Ok(())
})
}
Expand Down
Loading