Skip to content

Commit 60d7388

Browse files
author
Joshua Nelson
committed
Revert "Updated async dependencies"
This reverts commit 10a44bc.
1 parent 10a44bc commit 60d7388

File tree

10 files changed

+653
-437
lines changed

10 files changed

+653
-437
lines changed

Cargo.lock

+568-273
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+6-10
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ log = "0.4"
1313
regex = "1"
1414
structopt = "0.3"
1515
crates-index-diff = "7"
16-
reqwest = { version = "0.10", features = ["blocking"] }
16+
reqwest = "0.9"
1717
semver = "0.9"
1818
slug = "=0.1.1"
1919
env_logger = "0.7"
@@ -27,6 +27,11 @@ toml = "0.5"
2727
html5ever = "0.22"
2828
schemamama = "0.3"
2929
schemamama_postgres = "0.2"
30+
rusoto_s3 = "0.40"
31+
rusoto_core = "0.40"
32+
rusoto_credential = "0.40"
33+
futures = "0.1"
34+
tokio = "0.1"
3035
systemstat = "0.1.4"
3136
prometheus = { version = "0.7.0", default-features = false }
3237
lazy_static = "1.0.0"
@@ -57,15 +62,6 @@ notify = "4.0.15"
5762
chrono = { version = "0.4.11", features = ["serde"] }
5863
time = "0.1" # TODO: Remove once `iron` is removed
5964

60-
# Communicating with S3
61-
rusoto_s3 = "0.43"
62-
rusoto_core = "0.43"
63-
rusoto_credential = "0.43"
64-
65-
# Async
66-
futures-util = "0.3"
67-
tokio = { version = "0.2", features = ["rt-threaded"] }
68-
6965
[target.'cfg(not(windows))'.dependencies]
7066
libc = "0.2"
7167

src/db/delete_crate.rs

+24-29
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use crate::storage::s3::{s3_client, S3Backend, S3_BUCKET_NAME};
1+
use crate::storage::s3::{s3_client, S3_BUCKET_NAME};
22
use failure::{Error, Fail};
33
use postgres::Connection;
4-
use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3};
4+
use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3Client, S3};
55

66
/// List of directories in docs.rs's underlying storage (either the database or S3) containing a
77
/// subdirectory named after the crate. Those subdirectories will be deleted.
@@ -22,9 +22,8 @@ pub fn delete_crate(conn: &Connection, name: &str) -> Result<(), Error> {
2222
};
2323

2424
delete_from_database(conn, name, crate_id)?;
25-
if let Some(client) = s3_client() {
26-
let mut backend = S3Backend::new(client, S3_BUCKET_NAME);
27-
delete_from_s3(&mut backend, name)?
25+
if let Some(s3) = s3_client() {
26+
delete_from_s3(&s3, name)?;
2827
}
2928

3029
Ok(())
@@ -69,25 +68,24 @@ fn delete_from_database(conn: &Connection, name: &str, crate_id: i32) -> Result<
6968
Ok(())
7069
}
7170

72-
fn delete_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error> {
71+
fn delete_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
7372
for prefix in STORAGE_PATHS_TO_DELETE {
7473
delete_prefix_from_s3(s3, &format!("{}/{}/", prefix, name))?;
7574
}
76-
7775
Ok(())
7876
}
7977

80-
fn delete_prefix_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error> {
78+
fn delete_prefix_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
8179
let mut continuation_token = None;
8280
loop {
83-
let list =
84-
s3.runtime_handle()
85-
.block_on(s3.client().list_objects_v2(ListObjectsV2Request {
86-
bucket: S3_BUCKET_NAME.into(),
87-
prefix: Some(name.into()),
88-
continuation_token,
89-
..ListObjectsV2Request::default()
90-
}))?;
81+
let list = s3
82+
.list_objects_v2(ListObjectsV2Request {
83+
bucket: S3_BUCKET_NAME.into(),
84+
prefix: Some(name.into()),
85+
continuation_token,
86+
..ListObjectsV2Request::default()
87+
})
88+
.sync()?;
9189

9290
let to_delete = list
9391
.contents
@@ -99,23 +97,20 @@ fn delete_prefix_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error
9997
version_id: None,
10098
})
10199
.collect::<Vec<_>>();
102-
103-
let resp =
104-
s3.runtime_handle()
105-
.block_on(s3.client().delete_objects(DeleteObjectsRequest {
106-
bucket: S3_BUCKET_NAME.into(),
107-
delete: rusoto_s3::Delete {
108-
objects: to_delete,
109-
quiet: None,
110-
},
111-
..DeleteObjectsRequest::default()
112-
}))?;
113-
100+
let resp = s3
101+
.delete_objects(DeleteObjectsRequest {
102+
bucket: S3_BUCKET_NAME.into(),
103+
delete: rusoto_s3::Delete {
104+
objects: to_delete,
105+
quiet: None,
106+
},
107+
..DeleteObjectsRequest::default()
108+
})
109+
.sync()?;
114110
if let Some(errs) = resp.errors {
115111
for err in &errs {
116112
log::error!("error deleting file from s3: {:?}", err);
117113
}
118-
119114
failure::bail!("uploading to s3 failed");
120115
}
121116

src/index/api.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{error::Result, utils::MetadataPackage};
22
use chrono::{DateTime, Utc};
33
use failure::err_msg;
4-
use reqwest::{blocking::Client, header::ACCEPT};
4+
use reqwest::{header::ACCEPT, Client};
55
use semver::Version;
66
use serde_json::Value;
77
use std::io::Read;

src/storage/mod.rs

+6-15
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub fn get_file_list<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>, Error> {
5959

6060
pub(crate) enum Storage<'a> {
6161
Database(DatabaseBackend<'a>),
62-
S3(Box<S3Backend<'a>>),
62+
S3(S3Backend<'a>),
6363
}
6464

6565
impl<'a> Storage<'a> {
@@ -70,17 +70,16 @@ impl<'a> Storage<'a> {
7070
DatabaseBackend::new(conn).into()
7171
}
7272
}
73-
74-
pub(crate) fn get(&mut self, path: &str) -> Result<Blob, Error> {
73+
pub(crate) fn get(&self, path: &str) -> Result<Blob, Error> {
7574
match self {
7675
Self::Database(db) => db.get(path),
7776
Self::S3(s3) => s3.get(path),
7877
}
7978
}
8079

81-
fn store_batch(&mut self, batch: Vec<Blob>, trans: &Transaction) -> Result<(), Error> {
80+
fn store_batch(&mut self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> {
8281
match self {
83-
Self::Database(db) => db.store_batch(&batch, trans),
82+
Self::Database(db) => db.store_batch(batch, trans),
8483
Self::S3(s3) => s3.store_batch(batch),
8584
}
8685
}
@@ -132,18 +131,15 @@ impl<'a> Storage<'a> {
132131
date_updated: Utc::now(),
133132
})
134133
});
135-
136134
loop {
137135
let batch: Vec<_> = blobs
138136
.by_ref()
139137
.take(MAX_CONCURRENT_UPLOADS)
140138
.collect::<Result<_, Error>>()?;
141-
142139
if batch.is_empty() {
143140
break;
144141
}
145-
146-
self.store_batch(batch, &trans)?;
142+
self.store_batch(&batch, &trans)?;
147143
}
148144

149145
trans.commit()?;
@@ -156,7 +152,6 @@ fn detect_mime(file_path: &Path) -> Result<&'static str, Error> {
156152
.first_raw()
157153
.map(|m| m)
158154
.unwrap_or("text/plain");
159-
160155
Ok(match mime {
161156
"text/plain" | "text/troff" | "text/x-markdown" | "text/x-rust" | "text/x-toml" => {
162157
match file_path.extension().and_then(OsStr::to_str) {
@@ -183,7 +178,7 @@ impl<'a> From<DatabaseBackend<'a>> for Storage<'a> {
183178

184179
impl<'a> From<S3Backend<'a>> for Storage<'a> {
185180
fn from(db: S3Backend<'a>) -> Self {
186-
Self::S3(Box::new(db))
181+
Self::S3(db)
187182
}
188183
}
189184

@@ -205,23 +200,19 @@ mod test {
205200
.prefix("docs.rs-upload-test")
206201
.tempdir()
207202
.unwrap();
208-
209203
for blob in blobs {
210204
let path = dir.path().join(&blob.path);
211205
if let Some(parent) = path.parent() {
212206
fs::create_dir_all(parent).unwrap();
213207
}
214-
215208
fs::write(path, &blob.content).expect("failed to write to file");
216209
}
217-
218210
wrapper(|env| {
219211
let db = env.db();
220212
let conn = db.conn();
221213
let mut backend = Storage::Database(DatabaseBackend::new(&conn));
222214
let stored_files = backend.store_all(&conn, "", dir.path()).unwrap();
223215
assert_eq!(stored_files.len(), blobs.len());
224-
225216
for blob in blobs {
226217
let name = Path::new(&blob.path);
227218
assert!(stored_files.contains_key(name));

src/storage/s3.rs

+29-67
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
use super::Blob;
22
use chrono::{DateTime, NaiveDateTime, Utc};
33
use failure::Error;
4-
use futures_util::{
5-
future::FutureExt,
6-
stream::{FuturesUnordered, StreamExt},
7-
};
4+
use futures::Future;
85
use log::{error, warn};
96
use rusoto_core::region::Region;
107
use rusoto_credential::DefaultCredentialsProvider;
118
use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3};
12-
use std::{convert::TryInto, io::Read};
13-
use tokio::runtime::{Handle, Runtime};
9+
use std::convert::TryInto;
10+
use std::io::Read;
11+
use tokio::runtime::Runtime;
1412

1513
#[cfg(test)]
1614
mod test;
@@ -34,23 +32,15 @@ impl<'a> S3Backend<'a> {
3432
}
3533
}
3634

37-
#[cfg(test)]
38-
pub(crate) fn with_runtime(client: S3Client, bucket: &'a str, runtime: Runtime) -> Self {
39-
Self {
40-
client,
41-
bucket,
42-
runtime,
43-
}
44-
}
45-
46-
pub(super) fn get(&mut self, path: &str) -> Result<Blob, Error> {
35+
pub(super) fn get(&self, path: &str) -> Result<Blob, Error> {
4736
let res = self
48-
.runtime
49-
.block_on(self.client.get_object(GetObjectRequest {
37+
.client
38+
.get_object(GetObjectRequest {
5039
bucket: self.bucket.to_string(),
5140
key: path.into(),
5241
..Default::default()
53-
}))?;
42+
})
43+
.sync()?;
5444

5545
let mut b = res.body.unwrap().into_blocking_read();
5646
let mut content = Vec::with_capacity(
@@ -70,16 +60,14 @@ impl<'a> S3Backend<'a> {
7060
})
7161
}
7262

73-
pub(super) fn store_batch(&mut self, mut uploads: Vec<Blob>) -> Result<(), Error> {
63+
pub(super) fn store_batch(&mut self, batch: &[Blob]) -> Result<(), Error> {
64+
use futures::stream::FuturesUnordered;
65+
use futures::stream::Stream;
7466
let mut attempts = 0;
7567

7668
loop {
77-
// `FuturesUnordered` is used because the order of execution doesn't
78-
// matter, we just want things to execute as fast as possible
79-
let futures = FuturesUnordered::new();
80-
81-
// Drain uploads, filling `futures` with upload requests
82-
for blob in uploads.drain(..) {
69+
let mut futures = FuturesUnordered::new();
70+
for blob in batch {
8371
futures.push(
8472
self.client
8573
.put_object(PutObjectRequest {
@@ -89,53 +77,27 @@ impl<'a> S3Backend<'a> {
8977
content_type: Some(blob.mime.clone()),
9078
..Default::default()
9179
})
92-
// Drop the value returned by `put_object` because we don't need it,
93-
// emit an error and replace the error values with the blob that failed
94-
// to upload so that we can retry failed uploads
95-
.map(|resp| match resp {
96-
Ok(..) => {
97-
// Increment the total uploaded files when a file is uploaded
98-
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);
99-
100-
Ok(())
101-
}
102-
Err(err) => {
103-
error!("failed to upload file to s3: {:?}", err);
104-
Err(blob)
105-
}
80+
.inspect(|_| {
81+
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);
10682
}),
10783
);
10884
}
10985
attempts += 1;
11086

111-
// Collect all the failed uploads so that we can retry them
112-
uploads = self.runtime.block_on(
113-
futures
114-
.filter_map(|resp| async move { resp.err() })
115-
.collect(),
116-
);
117-
118-
// If there are no further uploads we were successful and can return
119-
if uploads.is_empty() {
120-
break;
121-
122-
// If more than three attempts to upload fail, return an error
123-
} else if attempts >= 3 {
124-
error!("failed to upload to s3, abandoning");
125-
failure::bail!("Failed to upload to s3 three times, abandoning");
87+
match self.runtime.block_on(futures.map(drop).collect()) {
88+
// this batch was successful, start another batch if there are still more files
89+
Ok(_) => break,
90+
Err(err) => {
91+
error!("failed to upload to s3: {:?}", err);
92+
// if a futures error occurs, retry the batch
93+
if attempts > 2 {
94+
panic!("failed to upload 3 times, exiting");
95+
}
96+
}
12697
}
12798
}
128-
12999
Ok(())
130100
}
131-
132-
pub fn runtime_handle(&self) -> Handle {
133-
self.runtime.handle().clone()
134-
}
135-
136-
pub fn client(&self) -> &S3Client {
137-
&self.client
138-
}
139101
}
140102

141103
fn parse_timespec(mut raw: &str) -> Result<DateTime<Utc>, Error> {
@@ -180,6 +142,7 @@ pub(crate) mod tests {
180142
use super::*;
181143
use crate::test::*;
182144
use chrono::TimeZone;
145+
use std::slice;
183146

184147
#[test]
185148
fn test_parse_timespec() {
@@ -209,7 +172,7 @@ pub(crate) mod tests {
209172

210173
// Add a test file to the database
211174
let s3 = env.s3();
212-
s3.upload(vec![blob.clone()]).unwrap();
175+
s3.upload(slice::from_ref(&blob)).unwrap();
213176

214177
// Test that the proper file was returned
215178
s3.assert_blob(&blob, "dir/foo.txt");
@@ -244,11 +207,10 @@ pub(crate) mod tests {
244207
})
245208
.collect();
246209

247-
s3.upload(blobs.clone()).unwrap();
210+
s3.upload(&blobs).unwrap();
248211
for blob in &blobs {
249212
s3.assert_blob(blob, &blob.path);
250213
}
251-
252214
Ok(())
253215
})
254216
}

0 commit comments

Comments
 (0)