Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(prometheus_scrape source): run requests in parallel with timeouts #18021

Merged
merged 16 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ pub struct HttpClientConfig {
#[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
pub endpoint: String,

/// The interval between calls.
/// The interval between scrapes. Requests run concurrently.
#[serde(default = "default_interval")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_interval_secs")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
pub interval: Duration,

/// The timeout for each scrape request, in seconds.
/// The timeout for each scrape request.
#[serde(default = "default_target_timeout")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_target_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Target Timeout"))]
#[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
#[serde(rename = "scrape_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Timeout"))]
pub target_timeout: Duration,

/// Custom parameters for the HTTP request query string.
Expand Down Expand Up @@ -202,6 +202,14 @@ impl SourceConfig for HttpClientConfig {
log_namespace,
};

if self.target_timeout > self.interval {
nullren marked this conversation as resolved.
Show resolved Hide resolved
warn!(
interval_secs = %self.interval.as_secs_f64(),
target_timeout_secs = %self.target_timeout.as_secs_f64(),
message = "Having a scrape timeout that exceeds the scrape interval can lead to excessive resource consumption.",
);
}

let inputs = GenericHttpClientInputs {
urls,
interval: self.interval,
Expand Down
18 changes: 13 additions & 5 deletions src/sources/prometheus/scrape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ pub struct PrometheusScrapeConfig {
#[serde(alias = "hosts")]
endpoints: Vec<String>,

/// The interval between scrapes, in seconds.
/// The interval between scrapes. Requests run concurrently.
nullren marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default = "default_interval")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_interval_secs")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
interval: Duration,

/// The timeout for each scrape request, in seconds.
/// The timeout for each scrape request.
#[serde(default = "default_target_timeout")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_target_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Target Timeout"))]
#[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
#[serde(rename = "scrape_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Timeout"))]
target_timeout: Duration,

/// The tag name added to each event representing the scraped instance's `host:port`.
Expand Down Expand Up @@ -152,6 +152,14 @@ impl SourceConfig for PrometheusScrapeConfig {
endpoint_tag: self.endpoint_tag.clone(),
};

if self.target_timeout > self.interval {
warn!(
interval_secs = %self.interval.as_secs_f64(),
target_timeout_secs = %self.target_timeout.as_secs_f64(),
message = "Having a scrape timeout that exceeds the scrape interval can lead to excessive resource consumption.",
);
}
nullren marked this conversation as resolved.
Show resolved Hide resolved

let inputs = GenericHttpClientInputs {
urls,
interval: self.interval,
Expand Down
5 changes: 2 additions & 3 deletions src/sources/util/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,14 @@ pub(crate) async fn call<
}

let start = Instant::now();
let timeout = std::cmp::min(inputs.target_timeout, inputs.interval);
tokio::time::timeout(timeout, client.send(request))
tokio::time::timeout(inputs.target_timeout, client.send(request))
.then(move |result| async move {
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error.into()),
Err(_) => Err(format!(
"Timeout error: request exceeded {}s",
timeout.as_secs_f32()
inputs.target_timeout.as_secs_f64()
)
.into()),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,21 @@ base: components: sources: http_client: configuration: {
}
}
scrape_interval_secs: {
description: "The interval between calls."
description: "The interval between scrapes. Requests run concurrently."
required: false
type: uint: {
default: 15
unit: "seconds"
}
}
scrape_timeout_secs: {
description: "The timeout for each scrape request."
required: false
type: float: {
default: 5.0
unit: "seconds"
}
}
tls: {
description: "TLS configuration."
required: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,21 @@ base: components: sources: prometheus_scrape: configuration: {
}
}
scrape_interval_secs: {
description: "The interval between scrapes, in seconds."
description: "The interval between scrapes. Requests run concurrently."
required: false
type: uint: {
default: 15
unit: "seconds"
}
}
scrape_timeout_secs: {
description: "The timeout for each scrape request."
required: false
type: float: {
default: 5.0
unit: "seconds"
}
}
tls: {
description: "TLS configuration."
required: false
Expand Down