Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

add basic context to every retried request #798

Merged
merged 7 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/agent/onefuzz-agent/src/tasks/merge/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License.

use crate::tasks::{config::CommonConfig, heartbeat::HeartbeatSender, utils};
use anyhow::Result;
use anyhow::{Context, Result};
use onefuzz::{
expand::Expand, fs::set_executable, http::ResponseExt, jitter::delay_with_jitter,
syncdir::SyncedDir,
Expand Down Expand Up @@ -117,16 +117,15 @@ async fn process_message(config: Arc<Config>, input_url: &Url, tmp_dir: &Path) -

async fn try_delete_blob(input_url: Url) -> Result<()> {
let http_client = reqwest::Client::new();
match http_client
http_client
.delete(input_url)
.send_retry_default()
.await?
.await
.context("try_delete_blob")?
.error_for_status_with_body()
.await
{
Ok(_) => Ok(()),
Err(err) => Err(err),
}
.context("try_delete_blob status body")?;
Ok(())
}

async fn merge(config: &Config, output_dir: impl AsRef<Path>) -> Result<()> {
Expand Down
14 changes: 7 additions & 7 deletions src/agent/onefuzz-agent/src/tasks/merge/libfuzzer_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::tasks::{
heartbeat::HeartbeatSender,
utils::{self, default_bool_true},
};
use anyhow::Result;
use anyhow::{Context, Result};
use onefuzz::{
http::ResponseExt,
jitter::delay_with_jitter,
Expand Down Expand Up @@ -171,14 +171,14 @@ pub async fn merge_inputs(

async fn try_delete_blob(input_url: Url) -> Result<()> {
let http_client = reqwest::Client::new();
match http_client
http_client
.delete(input_url)
.send_retry_default()
.await?
.await
.context("try_delete_blob")?
.error_for_status_with_body()
.await
{
Ok(_) => Ok(()),
Err(err) => Err(err),
}
.context("try_delete_blob status body")?;

Ok(())
}
8 changes: 5 additions & 3 deletions src/agent/onefuzz-agent/src/tasks/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use anyhow::Result;
use anyhow::{Context, Result};
use async_trait::async_trait;
use onefuzz::{http::ResponseExt, jitter::delay_with_jitter};
use reqwest::{Client, Url};
Expand All @@ -24,9 +24,11 @@ pub async fn download_input(input_url: Url, dst: impl AsRef<Path>) -> Result<Pat
let resp = Client::new()
.get(input_url)
.send_retry_default()
.await?
.await
.context("download_input")?
.error_for_status_with_body()
.await?;
.await
.context("download_input status body")?;

let body = resp.bytes().await?;
let mut body = body.as_ref();
Expand Down
9 changes: 6 additions & 3 deletions src/agent/onefuzz-supervisor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ impl Registration {
.bearer_auth(token.secret().expose_ref())
.body("")
.send_retry_default()
.await?;
.await
.context("Registration.create")?;

let status_code = response.status();

Expand Down Expand Up @@ -299,9 +300,11 @@ impl Registration {
.get(url)
.bearer_auth(token.secret().expose_ref())
.send_retry_default()
.await?
.await
.context("Registration.renew")?
.error_for_status_with_body()
.await?;
.await
.context("Registration.renew request body")?;

self.dynamic_config = response.json().await?;
self.dynamic_config.save().await?;
Expand Down
28 changes: 16 additions & 12 deletions src/agent/onefuzz-supervisor/src/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use anyhow::Result;
use anyhow::{Context, Result};
use downcast_rs::Downcast;
use onefuzz::{auth::AccessToken, http::ResponseExt, process::Output};
use reqwest::{Client, RequestBuilder, Response, StatusCode};
Expand Down Expand Up @@ -192,13 +192,13 @@ impl Coordinator {
/// If the request fails due to an expired access token, we will retry once
/// with a fresh one.
pub async fn poll_commands(&mut self) -> Result<Option<NodeCommand>> {
let response = self.send_with_auth_retry(RequestType::PollCommands).await?;
let response = self.send(RequestType::PollCommands).await?;
let data = response.bytes().await?;
let pending: PendingNodeCommand = serde_json::from_slice(&data)?;

if let Some(envelope) = pending.envelope {
let request = RequestType::ClaimCommand(envelope.message_id);
self.send_with_auth_retry(request).await?;
self.send(request).await?;

Ok(Some(envelope.command))
} else {
Expand All @@ -212,14 +212,14 @@ impl Coordinator {
machine_id: self.registration.machine_id,
};
let request = RequestType::EmitEvent(&envelope);
self.send_with_auth_retry(request).await?;
self.send(request).await?;

Ok(())
}

async fn can_schedule(&mut self, work_set: &WorkSet) -> Result<CanSchedule> {
let request = RequestType::CanSchedule(work_set);
let response = self.send_with_auth_retry(request).await?;
let response = self.send(request).await?;

let can_schedule: CanSchedule = response.json().await?;

Expand All @@ -229,10 +229,7 @@ impl Coordinator {
// The lifetime is needed by an argument type. We can't make it anonymous,
// as clippy suggests, because `'_` is not allowed in this binding site.
#[allow(clippy::needless_lifetimes)]
async fn send_with_auth_retry<'a>(
&mut self,
request_type: RequestType<'a>,
) -> Result<Response> {
async fn send<'a>(&mut self, request_type: RequestType<'a>) -> Result<Response> {
let request = self.get_request_builder(request_type.clone());
let mut response = request
.send_retry(
Expand All @@ -243,7 +240,8 @@ impl Coordinator {
DEFAULT_RETRY_PERIOD,
MAX_RETRY_ATTEMPTS,
)
.await?;
.await
.context("Coordinator.send")?;

if response.status() == StatusCode::UNAUTHORIZED {
debug!("access token expired, renewing");
Expand All @@ -255,12 +253,18 @@ impl Coordinator {

// And try one more time.
let request = self.get_request_builder(request_type);
response = request.send_retry_default().await?;
response = request
.send_retry_default()
.await
.context("Coordinator.send after refreshing access token")?;
};

// We've retried if we got a `401 Unauthorized`. If it happens again, we
// really want to bail this time.
let response = response.error_for_status_with_body().await?;
let response = response
.error_for_status_with_body()
.await
.context("Coordinator.send status body")?;

Ok(response)
}
Expand Down
14 changes: 9 additions & 5 deletions src/agent/onefuzz/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::fmt;

use crate::http::ResponseExt;
use anyhow::Result;
use anyhow::{Context, Result};
use reqwest_retry::SendRetry;
use url::Url;
use uuid::Uuid;
Expand Down Expand Up @@ -142,9 +142,11 @@ impl ClientCredentials {
("scope", format!("{}.default", resource)),
])
.send_retry_default()
.await?
.await
.context("access_token request")?
.error_for_status_with_body()
.await?;
.await
.context("access_token request body")?;

let body: ClientAccessTokenBody = response.json().await?;

Expand Down Expand Up @@ -208,9 +210,11 @@ impl ManagedIdentityCredentials {
.get(self.url())
.header("Metadata", "true")
.send_retry_default()
.await?
.await
.context("ManagedIdentityCredentials.access_token")?
.error_for_status_with_body()
.await?;
.await
.context("ManagedIdentityCredentials.access_token status body")?;

let body: ManagedIdentityAccessTokenBody = response.json().await?;

Expand Down
38 changes: 14 additions & 24 deletions src/agent/onefuzz/src/blob/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::path::{Path, PathBuf};

use anyhow::Result;
use anyhow::{Context, Result};
use futures::stream::TryStreamExt;
use reqwest::{Body, RequestBuilder, Response, Url};
use reqwest_retry::SendRetry;
Expand Down Expand Up @@ -36,8 +36,10 @@ impl BlobClient {
.client
.get(url)
.send_retry_default()
.await?
.error_for_status()?;
.await
.context("BlobClient.get")?
.error_for_status()
.context("BlobClient.get status")?;

Ok(r)
}
Expand All @@ -63,30 +65,22 @@ impl BlobClient {
}

pub async fn put_data(&self, url: Url, data: impl Into<Body>) -> Result<Response> {
let r = self
.client
.put(url)
.header("x-ms-blob-type", "BlockBlob")
self.put(url)
.body(data)
.send_retry_default()
.await?;

Ok(r)
.await
.context("BlobClient.put_data")
}

pub async fn put_json<I>(&self, url: Url, item: I) -> Result<Response>
where
I: Serialize,
{
let r = self
.client
.put(url)
.header("x-ms-blob-type", "BlockBlob")
self.put(url)
.json(&item)
.send_retry_default()
.await?;

Ok(r)
.await
.context("BlobClient.put_json")
}

pub async fn put_file(&self, file_url: Url, file_path: impl AsRef<Path>) -> Result<Response> {
Expand All @@ -103,15 +97,11 @@ impl BlobClient {
let body = reqwest::Body::wrap_stream(file_stream);
let content_length = format!("{}", file_len);

let resp = self
.client
.put(file_url)
self.put(file_url)
.header("Content-Length", &content_length)
.header("x-ms-blob-type", "BlockBlob")
.body(body)
.send_retry_default()
.await?;

Ok(resp)
.await
.context("BlobClient.put_file")
}
}
10 changes: 5 additions & 5 deletions src/agent/onefuzz/src/machine_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
// Licensed under the MIT License.

use crate::fs::{onefuzz_etc, write_file};
#[cfg(target_os = "linux")]
use anyhow::Context;
use anyhow::Result;
use anyhow::{Context, Result};
use reqwest_retry::SendRetry;
#[cfg(target_os = "linux")]
use std::path::Path;
Expand Down Expand Up @@ -33,7 +31,8 @@ pub async fn get_ims_id() -> Result<Uuid> {
.timeout(Duration::from_millis(500))
.header("Metadata", "true")
.send_retry_default()
.await?;
.await
.context("get_ims_id")?;
let body = resp.text().await?;
write_file(path, &body).await?;
body
Expand All @@ -54,7 +53,8 @@ pub async fn get_machine_name() -> Result<String> {
.timeout(Duration::from_millis(500))
.header("Metadata", "true")
.send_retry_default()
.await?;
.await
.context("get_machine_name")?;
let body = resp.text().await?;
write_file(path, &body).await?;
body
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz/src/syncdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl SyncedDir {
MAX_RETRY_ATTEMPTS,
)
.await
.context("Uploading blob")?;
.context("SyncedDir.upload")?;

Ok(result.status() == StatusCode::CREATED)
}
Expand Down
8 changes: 5 additions & 3 deletions src/agent/onefuzz/src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::path::Path;

use anyhow::Result;
use anyhow::{Context, Result};
use futures::stream::TryStreamExt;
use reqwest::{Body, Client, Response, StatusCode, Url};
use reqwest_retry::{
Expand Down Expand Up @@ -78,7 +78,8 @@ impl BlobUploader {

Ok(request_builder)
})
.await?;
.await
.context("BlobUploader.upload")?;

Ok(resp)
}
Expand All @@ -102,7 +103,8 @@ impl BlobUploader {
.header("x-ms-blob-type", "BlockBlob")
.json(&data)
.send_retry_default()
.await?;
.await
.context("BlobUploader.upload_json")?;

Ok(resp)
}
Expand Down
6 changes: 3 additions & 3 deletions src/agent/reqwest-retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use backoff::{self, future::retry_notify, ExponentialBackoff};
use onefuzz_telemetry::warn;
use onefuzz_telemetry::debug;
use reqwest::{Response, StatusCode};
use std::{
sync::atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -113,10 +113,10 @@ where
|err: Result<Response, anyhow::Error>, dur| match err {
Ok(response) => {
if let Err(err) = response.error_for_status() {
warn!("request attempt failed after {:?}: {:?}", dur, err)
debug!("request attempt failed after {:?}: {:?}", dur, err)
}
}
err => warn!("request attempt failed after {:?}: {:?}", dur, err),
err => debug!("request attempt failed after {:?}: {:?}", dur, err),
},
)
.await;
Expand Down
Loading