diff --git a/src/push.rs b/src/push.rs index 80a253a6..35ebdd31 100644 --- a/src/push.rs +++ b/src/push.rs @@ -8,6 +8,7 @@ use std::time::Duration; use reqwest::blocking::Client; use reqwest::header::CONTENT_TYPE; +use reqwest::Client as AsyncClient; use reqwest::{Method, StatusCode, Url}; use crate::encoder::{Encoder, ProtobufEncoder}; @@ -23,6 +24,10 @@ lazy_static! { .timeout(REQWEST_TIMEOUT_SEC) .build() .unwrap(); + static ref ASYNC_HTTP_CLIENT: AsyncClient = AsyncClient::builder() + .timeout(REQWEST_TIMEOUT_SEC) + .build() + .unwrap(); } /// `BasicAuthentication` holder for supporting `push` to Pushgateway endpoints @@ -60,6 +65,18 @@ pub fn push_metrics( push(job, grouping, url, mfs, "PUT", basic_auth) } +/// Functions just like `push_metrics`, except the metrics are pushed +/// asynchronously. +pub async fn push_metrics_async( + job: &str, + grouping: HashMap, + url: &str, + mfs: Vec, + basic_auth: Option, +) -> Result<()> { + push_async(job, grouping, url, mfs, "PUT", basic_auth).await +} + /// `push_add_metrics` works like `push_metrics`, but only previously pushed /// metrics with the same name (and the same job and other grouping labels) will /// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.) @@ -75,14 +92,14 @@ pub fn push_add_metrics( const LABEL_NAME_JOB: &str = "job"; -fn push( +fn configure_push( job: &str, grouping: HashMap, url: &str, mfs: Vec, method: &str, - basic_auth: Option, -) -> Result<()> { + basic_auth: &Option, +) -> Result<(String, impl Encoder, Vec)> { // Suppress clippy warning needless_pass_by_value. let grouping = grouping; @@ -145,7 +162,18 @@ fn push( // Ignore error, `no metrics` and `no name`. let _ = encoder.encode(&[mf], &mut buf); } + Ok((push_url, encoder, buf)) +} +fn push( + job: &str, + grouping: HashMap, + url: &str, + mfs: Vec, + method: &str, + basic_auth: Option, +) -> Result<()> { + let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs, method, &basic_auth)?; let mut builder = HTTP_CLIENT .request( Method::from_str(method).unwrap(), @@ -159,13 +187,41 @@ fn push( } let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?; + handle_push_response(response.status(), push_url) +} + +async fn push_async( + job: &str, + grouping: HashMap, + url: &str, + mfs: Vec, + method: &str, + basic_auth: Option, +) -> Result<()> { + let (push_url, encoder, buf) = configure_push(job, grouping, url, mfs, method, &basic_auth)?; + let mut builder = ASYNC_HTTP_CLIENT + .request( + Method::from_str(method).unwrap(), + Url::from_str(&push_url).unwrap(), + ) + .header(CONTENT_TYPE, encoder.format_type()) + .body(buf); + + if let Some(BasicAuthentication { username, password }) = basic_auth { + builder = builder.basic_auth(username, Some(password)); + } + + let response = builder.send().await.map_err(|e| Error::Msg(format!("{}", e)))?; + handle_push_response(response.status(), push_url) +} - match response.status() { +fn handle_push_response(status: StatusCode, push_url: String) -> Result<()> { + match status { StatusCode::ACCEPTED => Ok(()), StatusCode::OK => Ok(()), _ => Err(Error::Msg(format!( "unexpected status code {} while pushing to {}", - response.status(), + status, push_url ))), }