Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
Resilient connection (#153)
Browse files Browse the repository at this point in the history
  • Loading branch information
chkeita authored Oct 28, 2020
1 parent 1d2fb99 commit db85341
Show file tree
Hide file tree
Showing 19 changed files with 532 additions and 38 deletions.
267 changes: 267 additions & 0 deletions src/agent/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"onefuzz",
"onefuzz-agent",
"onefuzz-supervisor",
"reqwest-retry",
"storage-queue",
"win-util",
]
Expand Down
1 change: 1 addition & 0 deletions src/agent/onefuzz-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ url = { version = "2.1", features = ["serde"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
onefuzz = { path = "../onefuzz" }
storage-queue = { path = "../storage-queue" }
reqwest-retry = { path = "../reqwest-retry" }

[dev-dependencies]
tempfile = "3.1"
3 changes: 2 additions & 1 deletion src/agent/onefuzz-agent/src/tasks/merge/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use onefuzz::{
syncdir::SyncedDir,
};
use reqwest::Url;
use reqwest_retry::SendRetry;
use serde::Deserialize;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -117,7 +118,7 @@ async fn try_delete_blob(input_url: Url) -> Result<()> {
let http_client = reqwest::Client::new();
match http_client
.delete(input_url)
.send()
.send_retry_default()
.await?
.error_for_status_with_body()
.await
Expand Down
3 changes: 2 additions & 1 deletion src/agent/onefuzz-agent/src/tasks/merge/libfuzzer_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use onefuzz::{
syncdir::SyncedDir,
};
use reqwest::Url;
use reqwest_retry::SendRetry;
use serde::Deserialize;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -116,7 +117,7 @@ async fn try_delete_blob(input_url: Url) -> Result<()> {
let http_client = reqwest::Client::new();
match http_client
.delete(input_url)
.send()
.send_retry_default()
.await?
.error_for_status_with_body()
.await
Expand Down
8 changes: 5 additions & 3 deletions src/agent/onefuzz-agent/src/tasks/report/crash_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use onefuzz::{
syncdir::SyncedDir,
telemetry::Event::{new_report, new_unable_to_reproduce, new_unique_report},
};

use reqwest::StatusCode;
use reqwest_retry::SendRetry;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use uuid::Uuid;
Expand Down Expand Up @@ -65,7 +67,7 @@ async fn upload_deduped(report: &CrashReport, container: &BlobContainerUrl) -> R
.json(report)
// Conditional PUT, only if-not-exists.
.header("If-None-Match", "*")
.send()
.send_retry_default()
.await?;
if result.status() != StatusCode::NOT_MODIFIED {
event!(new_unique_report;);
Expand All @@ -77,15 +79,15 @@ async fn upload_report(report: &CrashReport, container: &BlobContainerUrl) -> Re
event!(new_report;);
let blob = BlobClient::new();
let url = container.blob(report.blob_name()).url();
blob.put(url).json(report).send().await?;
blob.put(url).json(report).send_retry_default().await?;
Ok(())
}

async fn upload_no_repro(report: &NoCrash, container: &BlobContainerUrl) -> Result<()> {
event!(new_unable_to_reproduce;);
let blob = BlobClient::new();
let url = container.blob(report.blob_name()).url();
blob.put(url).json(report).send().await?;
blob.put(url).json(report).send_retry_default().await?;
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions src/agent/onefuzz-supervisor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ tokio = { version = "0.2.13", features = ["full"] }
url = { version = "2.1.1", features = ["serde"] }
uuid = { version = "0.8.1", features = ["serde", "v4"] }
clap = "2.33"
reqwest-retry = { path = "../reqwest-retry" }
5 changes: 3 additions & 2 deletions src/agent/onefuzz-supervisor/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::fmt;

use anyhow::Result;
use onefuzz::http::ResponseExt;
use reqwest_retry::SendRetry;
use url::Url;
use uuid::Uuid;

Expand Down Expand Up @@ -112,7 +113,7 @@ impl ClientCredentials {
.post(url)
.header("Content-Length", "0")
.form(&self.form_data())
.send()
.send_retry_default()
.await?
.error_for_status_with_body()
.await?;
Expand Down Expand Up @@ -183,7 +184,7 @@ impl ManagedIdentityCredentials {
let response = reqwest::Client::new()
.get(self.url())
.header("Metadata", "true")
.send()
.send_retry_default()
.await?
.error_for_status_with_body()
.await?;
Expand Down
5 changes: 3 additions & 2 deletions src/agent/onefuzz-supervisor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use anyhow::Result;
use onefuzz::{http::ResponseExt, jitter::delay_with_jitter};
use reqwest::StatusCode;
use reqwest_retry::SendRetry;
use std::{
path::{Path, PathBuf},
time::{Duration, Instant},
Expand Down Expand Up @@ -159,7 +160,7 @@ impl Registration {
.header("Content-Length", "0")
.bearer_auth(token.secret().expose_ref())
.body("")
.send()
.send_retry_default()
.await?
.error_for_status();

Expand Down Expand Up @@ -219,7 +220,7 @@ impl Registration {
let response = reqwest::Client::new()
.get(url)
.bearer_auth(token.secret().expose_ref())
.send()
.send_retry_default()
.await?
.error_for_status_with_body()
.await?;
Expand Down
1 change: 1 addition & 0 deletions src/agent/onefuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ strum = "0.19"
strum_macros = "0.19"
tempfile = "3.1"
process_control = "2.0"
reqwest-retry = { path = "../reqwest-retry"}

[target.'cfg(target_os = "windows")'.dependencies]
winreg = "0.7"
Expand Down
14 changes: 10 additions & 4 deletions src/agent/onefuzz/src/blob/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::path::{Path, PathBuf};
use anyhow::Result;
use futures::stream::TryStreamExt;
use reqwest::{Body, RequestBuilder, Response, Url};
use reqwest_retry::SendRetry;
use serde::Serialize;
use tokio::{fs, io};
use tokio_util::codec;
Expand All @@ -31,7 +32,12 @@ impl BlobClient {
pub async fn get(&self, url: &Url) -> Result<Response> {
let url = url.clone();

let r = self.client.get(url).send().await?.error_for_status()?;
let r = self
.client
.get(url)
.send_retry_default()
.await?
.error_for_status()?;

Ok(r)
}
Expand Down Expand Up @@ -62,7 +68,7 @@ impl BlobClient {
.put(url)
.header("x-ms-blob-type", "BlockBlob")
.body(data)
.send()
.send_retry_default()
.await?;

Ok(r)
Expand All @@ -77,7 +83,7 @@ impl BlobClient {
.put(url)
.header("x-ms-blob-type", "BlockBlob")
.json(&item)
.send()
.send_retry_default()
.await?;

Ok(r)
Expand All @@ -103,7 +109,7 @@ impl BlobClient {
.header("Content-Length", &content_length)
.header("x-ms-blob-type", "BlockBlob")
.body(body)
.send()
.send_retry_default()
.await?;

Ok(resp)
Expand Down
7 changes: 4 additions & 3 deletions src/agent/onefuzz/src/machine_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::fs::{onefuzz_etc, write_file};
use anyhow::Result;
use reqwest_retry::SendRetry;
use std::time::Duration;
use tokio::fs;
use uuid::Uuid;
Expand All @@ -27,7 +28,7 @@ pub async fn get_ims_id() -> Result<Uuid> {
.get(IMS_ID_URL)
.timeout(Duration::from_millis(500))
.header("Metadata", "true")
.send()
.send_retry_default()
.await?;
let body = resp.text().await?;
write_file(path, &body).await?;
Expand All @@ -48,7 +49,7 @@ pub async fn get_machine_name() -> Result<String> {
.get(VM_NAME_URL)
.timeout(Duration::from_millis(500))
.header("Metadata", "true")
.send()
.send_retry_default()
.await?;
let body = resp.text().await?;
write_file(path, &body).await?;
Expand All @@ -68,7 +69,7 @@ pub async fn get_scaleset_name() -> Result<String> {
.get(VM_SCALESET_NAME)
.timeout(Duration::from_millis(500))
.header("Metadata", "true")
.send()
.send_retry_default()
.await?;
let body = resp.text().await?;
write_file(path, &body).await?;
Expand Down
36 changes: 19 additions & 17 deletions src/agent/onefuzz/src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::path::Path;
use anyhow::Result;
use futures::stream::TryStreamExt;
use reqwest as r;
use reqwest_retry::{send_retry_reqwest_default, SendRetry};
use serde::Serialize;
use tokio::{fs, io};
use tokio_util::codec;
Expand All @@ -31,13 +32,6 @@ impl BlobUploader {
let metadata = fs::metadata(file_path).await?;
let file_len = metadata.len();

let file = fs::File::open(file_path).await?;
let reader = io::BufReader::new(file);
let codec = codec::BytesCodec::new();
let file_stream = codec::FramedRead::new(reader, codec).map_ok(bytes::BytesMut::freeze);

let body = r::Body::wrap_stream(file_stream);

let url = {
let url_path = self.url.path();
let blob_path = format!("{}/{}", url_path, file_name);
Expand All @@ -47,21 +41,29 @@ impl BlobUploader {
};

// Check if the file already exists before uploading
let head = self.client.head(url.clone()).send().await?;
let head = self.client.head(url.clone()).send_retry_default().await?;
if head.status() == reqwest::StatusCode::OK {
return Ok(head);
}

let content_length = format!("{}", file_len);

let resp = self
.client
.put(url)
.header("Content-Length", &content_length)
.header("x-ms-blob-type", "BlockBlob")
.body(body)
.send()
.await?;
let resp = send_retry_reqwest_default(|| {
let file = fs::File::from_std(std::fs::File::open(file_path)?);
let reader = io::BufReader::new(file);
let codec = codec::BytesCodec::new();
let file_stream = codec::FramedRead::new(reader, codec).map_ok(bytes::BytesMut::freeze);

let request_builder = self
.client
.put(url.clone())
.header("Content-Length", &content_length)
.header("x-ms-blob-type", "BlockBlob")
.body(r::Body::wrap_stream(file_stream));

Ok(request_builder)
})
.await?;

Ok(resp)
}
Expand All @@ -84,7 +86,7 @@ impl BlobUploader {
.put(url)
.header("x-ms-blob-type", "BlockBlob")
.json(&data)
.send()
.send_retry_default()
.await?;

Ok(resp)
Expand Down
13 changes: 13 additions & 0 deletions src/agent/reqwest-retry/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "reqwest-retry"
version = "0.1.0"
authors = ["fuzzing@microsoft.com"]
edition = "2018"
license = "MIT"

[dependencies]
anyhow = "1.0"
async-trait = "0.1.36"
reqwest = { version = "0.10", features = ["json", "stream"] }
backoff = { version = "0.2.1", features = ["async-std"] }
log = "0.4"
Loading

0 comments on commit db85341

Please sign in to comment.