Skip to content

Commit

Permalink
Fix tikv#342: Async support for push_metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
adamchalmers committed Sep 3, 2020
1 parent b2f473d commit 9607ddf
Showing 1 changed file with 61 additions and 5 deletions.
66 changes: 61 additions & 5 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::time::Duration;

use reqwest::blocking::Client;
use reqwest::header::CONTENT_TYPE;
use reqwest::Client as AsyncClient;
use reqwest::{Method, StatusCode, Url};

use crate::encoder::{Encoder, ProtobufEncoder};
Expand All @@ -23,6 +24,10 @@ lazy_static! {
.timeout(REQWEST_TIMEOUT_SEC)
.build()
.unwrap();
static ref ASYNC_HTTP_CLIENT: AsyncClient = AsyncClient::builder()
.timeout(REQWEST_TIMEOUT_SEC)
.build()
.unwrap();
}

/// `BasicAuthentication` holder for supporting `push` to Pushgateway endpoints
Expand Down Expand Up @@ -60,6 +65,18 @@ pub fn push_metrics<S: BuildHasher>(
push(job, grouping, url, mfs, "PUT", basic_auth)
}

/// Functions just like `push_metrics`, except the metrics are pushed
/// asynchronously.
pub async fn push_metrics_async<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
push_async(job, grouping, url, mfs, "PUT", basic_auth).await
}

/// `push_add_metrics` works like `push_metrics`, but only previously pushed
/// metrics with the same name (and the same job and other grouping labels) will
/// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
Expand All @@ -75,14 +92,14 @@ pub fn push_add_metrics<S: BuildHasher>(

const LABEL_NAME_JOB: &str = "job";

fn push<S: BuildHasher>(
fn configure_push<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
basic_auth: &Option<BasicAuthentication>,
) -> Result<(String, impl Encoder, Vec<u8>)> {
// Suppress clippy warning needless_pass_by_value.
let grouping = grouping;

Expand Down Expand Up @@ -145,7 +162,18 @@ fn push<S: BuildHasher>(
// Ignore error, `no metrics` and `no name`.
let _ = encoder.encode(&[mf], &mut buf);
}
Ok((push_url, encoder, buf))
}

fn push<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs, method, &basic_auth)?;
let mut builder = HTTP_CLIENT
.request(
Method::from_str(method).unwrap(),
Expand All @@ -159,13 +187,41 @@ fn push<S: BuildHasher>(
}

let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
handle_push_response(response.status(), push_url)
}

async fn push_async<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
method: &str,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs, method, &basic_auth)?;
let mut builder = ASYNC_HTTP_CLIENT
.request(
Method::from_str(method).unwrap(),
Url::from_str(&push_url).unwrap(),
)
.header(CONTENT_TYPE, encoder.format_type())
.body(buf);

if let Some(BasicAuthentication { username, password }) = basic_auth {
builder = builder.basic_auth(username, Some(password));
}

let response = builder.send().await.map_err(|e| Error::Msg(format!("{}", e)))?;
handle_push_response(response.status(), push_url)
}

match response.status() {
fn handle_push_response(status: StatusCode, push_url: String) -> Result<()> {
match status {
StatusCode::ACCEPTED => Ok(()),
StatusCode::OK => Ok(()),
_ => Err(Error::Msg(format!(
"unexpected status code {} while pushing to {}",
response.status(),
status,
push_url
))),
}
Expand Down

0 comments on commit 9607ddf

Please sign in to comment.