Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(prometheus_scrape source): run requests in parallel with timeouts #18021

Merged
merged 16 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::{
sources::util::{
http::HttpMethod,
http_client::{
build_url, call, default_interval, GenericHttpClientInputs, HttpClientBuilder,
build_url, call, default_interval, default_timeout, warn_if_interval_too_low,
GenericHttpClientInputs, HttpClientBuilder,
},
},
tls::{TlsConfig, TlsSettings},
Expand Down Expand Up @@ -51,13 +52,20 @@ pub struct HttpClientConfig {
#[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
pub endpoint: String,

/// The interval between calls.
/// The interval between scrapes. Requests run concurrently.
#[serde(default = "default_interval")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_interval_secs")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
pub interval: Duration,

/// The timeout for each scrape request.
#[serde(default = "default_timeout")]
#[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
#[serde(rename = "scrape_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Timeout"))]
pub timeout: Duration,

/// Custom parameters for the HTTP request query string.
///
/// One or more values for the same parameter key can be provided.
Expand Down Expand Up @@ -153,6 +161,7 @@ impl Default for HttpClientConfig {
endpoint: "http://localhost:9898/logs".to_string(),
query: HashMap::new(),
interval: default_interval(),
timeout: default_timeout(),
decoding: default_decoding(),
framing: default_framing_message_based(),
headers: HashMap::new(),
Expand Down Expand Up @@ -193,9 +202,12 @@ impl SourceConfig for HttpClientConfig {
log_namespace,
};

warn_if_interval_too_low(self.timeout, self.interval);

let inputs = GenericHttpClientInputs {
urls,
interval: self.interval,
timeout: self.timeout,
headers: self.headers.clone(),
content_type,
auth: self.auth.clone(),
Expand Down
8 changes: 8 additions & 0 deletions src/sources/http_client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::test_util::{

pub(crate) const INTERVAL: Duration = Duration::from_secs(1);

pub(crate) const TIMEOUT: Duration = Duration::from_secs(1);

/// The happy path should yield at least one event and must emit the required internal events for sources.
pub(crate) async fn run_compliance(config: HttpClientConfig) -> Vec<Event> {
let events =
Expand Down Expand Up @@ -47,6 +49,7 @@ async fn bytes_decoding() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: default_decoding(),
framing: default_framing_message_based(),
Expand Down Expand Up @@ -75,6 +78,7 @@ async fn json_decoding_newline_delimited() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: FramingConfig::NewlineDelimited(Default::default()),
Expand Down Expand Up @@ -103,6 +107,7 @@ async fn json_decoding_character_delimited() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
Expand Down Expand Up @@ -135,6 +140,7 @@ async fn request_query_applied() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint?key1=val1", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::from([
("key1".to_string(), vec!["val2".to_string()]),
(
Expand Down Expand Up @@ -203,6 +209,7 @@ async fn headers_applied() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: default_decoding(),
framing: default_framing_message_based(),
Expand Down Expand Up @@ -234,6 +241,7 @@ async fn accept_header_override() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Bytes,
framing: default_framing_message_based(),
Expand Down
20 changes: 19 additions & 1 deletion src/sources/prometheus/scrape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vector_core::{config::LogNamespace, event::Event};

use super::parser;
use crate::sources::util::http::HttpMethod;
use crate::sources::util::http_client::{default_timeout, warn_if_interval_too_low};
use crate::{
config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
http::Auth,
Expand Down Expand Up @@ -53,13 +54,20 @@ pub struct PrometheusScrapeConfig {
#[serde(alias = "hosts")]
endpoints: Vec<String>,

/// The interval between scrapes, in seconds.
/// The interval between scrapes. Requests run concurrently.
nullren marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default = "default_interval")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_interval_secs")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
interval: Duration,

/// The timeout for each scrape request.
#[serde(default = "default_timeout")]
#[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
#[serde(rename = "scrape_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Timeout"))]
timeout: Duration,

/// The tag name added to each event representing the scraped instance's `host:port`.
///
/// The tag value is the host and port of the scraped instance.
Expand Down Expand Up @@ -114,6 +122,7 @@ impl GenerateConfig for PrometheusScrapeConfig {
toml::Value::try_from(Self {
endpoints: vec!["http://localhost:9090/metrics".to_string()],
interval: default_interval(),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -143,9 +152,12 @@ impl SourceConfig for PrometheusScrapeConfig {
endpoint_tag: self.endpoint_tag.clone(),
};

warn_if_interval_too_low(self.timeout, self.interval);

let inputs = GenericHttpClientInputs {
urls,
interval: self.interval,
timeout: self.timeout,
headers: HashMap::new(),
content_type: "text/plain".to_string(),
auth: self.auth.clone(),
Expand Down Expand Up @@ -351,6 +363,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -384,6 +397,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -435,6 +449,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -500,6 +515,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -555,6 +571,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -668,6 +685,7 @@ mod test {
honor_labels: false,
query: HashMap::new(),
interval: Duration::from_secs(1),
timeout: default_timeout(),
tls: None,
auth: None,
},
Expand Down
47 changes: 38 additions & 9 deletions src/sources/util/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
},
sources::util::http::HttpMethod,
tls::TlsSettings,
Error, SourceSender,
SourceSender,
};
use vector_common::shutdown::ShutdownSignal;
use vector_core::{config::proxy::ProxyConfig, event::Event, EstimatedJsonEncodedSizeOf};
Expand All @@ -36,6 +36,8 @@ pub(crate) struct GenericHttpClientInputs {
pub urls: Vec<Uri>,
/// Interval between calls.
pub interval: Duration,
/// Timeout for the HTTP request.
pub timeout: Duration,
/// Map of Header+Value to apply to HTTP request.
pub headers: HashMap<String, Vec<String>>,
/// Content type of the HTTP request, determined by the source.
Expand All @@ -51,6 +53,11 @@ pub(crate) const fn default_interval() -> Duration {
Duration::from_secs(15)
}

/// The default timeout for the HTTP request if none is configured.
pub(crate) const fn default_timeout() -> Duration {
Duration::from_secs(5)
}

/// Builds the context, allowing the source-specific implementation to leverage data from the
/// config and the current HTTP request.
pub(crate) trait HttpClientBuilder {
Expand Down Expand Up @@ -101,6 +108,17 @@ pub(crate) fn build_url(uri: &Uri, query: &HashMap<String, Vec<String>>) -> Uri
.expect("Failed to build URI from parsed arguments")
}

/// Warns if the scrape timeout is greater than the scrape interval.
pub(crate) fn warn_if_interval_too_low(timeout: Duration, interval: Duration) {
if timeout > interval {
warn!(
interval_secs = %interval.as_secs_f64(),
timeout_secs = %timeout.as_secs_f64(),
message = "Having a scrape timeout that exceeds the scrape interval can lead to excessive resource consumption.",
);
}
}

/// Calls one or more urls at an interval.
/// - The HTTP request is built per the options in provided generic inputs.
/// - The HTTP response is decoded/parsed into events by the specific context.
Expand All @@ -114,15 +132,16 @@ pub(crate) async fn call<
mut out: SourceSender,
http_method: HttpMethod,
) -> Result<(), ()> {
// Building the HttpClient should not fail as it is just setting up the client with the
// proxy and tls settings.
let client =
HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed");
let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval))
.take_until(inputs.shutdown)
.map(move |_| stream::iter(inputs.urls.clone()))
.flatten()
.map(move |url| {
// Building the HttpClient should not fail as it is just setting up the client with the
// proxy and tls settings.
let client = HttpClient::new(inputs.tls.clone(), &inputs.proxy)
.expect("Building HTTP client failed");
let client = client.clone();
let endpoint = url.to_string();

let context_builder = context_builder.clone();
Expand Down Expand Up @@ -157,9 +176,18 @@ pub(crate) async fn call<
}

let start = Instant::now();
client
.send(request)
.map_err(Error::from)
tokio::time::timeout(inputs.timeout, client.send(request))
.then(move |result| async move {
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error.into()),
Err(_) => Err(format!(
"Timeout error: request exceeded {}s",
inputs.timeout.as_secs_f64()
)
.into()),
}
})
.and_then(|response| async move {
let (header, body) = response.into_parts();
let body = hyper::body::to_bytes(body).await?;
Expand Down Expand Up @@ -224,8 +252,9 @@ pub(crate) async fn call<
})
})
.flatten()
.boxed()
})
.flatten()
.flatten_unordered(None)
.boxed();

match out.send_event_stream(&mut stream).await {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,21 @@ base: components: sources: http_client: configuration: {
}
}
scrape_interval_secs: {
description: "The interval between calls."
description: "The interval between scrapes. Requests run concurrently."
required: false
type: uint: {
default: 15
unit: "seconds"
}
}
scrape_timeout_secs: {
description: "The timeout for each scrape request."
required: false
type: float: {
default: 5.0
unit: "seconds"
}
}
tls: {
description: "TLS configuration."
required: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,21 @@ base: components: sources: prometheus_scrape: configuration: {
}
}
scrape_interval_secs: {
description: "The interval between scrapes, in seconds."
description: "The interval between scrapes. Requests run concurrently."
required: false
type: uint: {
default: 15
unit: "seconds"
}
}
scrape_timeout_secs: {
description: "The timeout for each scrape request."
required: false
type: float: {
default: 5.0
unit: "seconds"
}
}
tls: {
description: "TLS configuration."
required: false
Expand Down