Skip to content

Commit

Permalink
enhancement(prometheus_scrape source): run requests in parallel with …
Browse files Browse the repository at this point in the history
…timeouts (vectordotdev#18021)

<!--
**Your PR title must conform to the conventional commit spec!**

  <type>(<scope>)!: <description>

  * `type` = chore, enhancement, feat, fix, docs
  * `!` = OPTIONAL: signals a breaking change
* `scope` = Optional when `type` is "chore" or "docs", available scopes
https://github.com/vectordotdev/vector/blob/master/.github/semantic.yml#L20
  * `description` = short description of the change

Examples:

  * enhancement(file source): Add `sort` option to sort discovered files
  * feat(new source): Initial `statsd` source
  * fix(file source): Fix a bug discovering new files
  * chore(external docs): Clarify `batch_size` option
-->

fixes vectordotdev#14087 
fixes vectordotdev#14132 
fixes vectordotdev#17659

- [x] make target timeout configurable

this builds on what @wjordan did in
vectordotdev#17660

### what's changed
- prometheus scrapes happen concurrently
- requests to targets can timeout
- the timeout can be configured (user facing change)
- small change in how the http was instantiated

---------

Co-authored-by: Doug Smith <dsmith3197@users.noreply.github.com>
Co-authored-by: Stephen Wakely <stephen@lisp.space>
  • Loading branch information
3 people authored Jul 24, 2023
1 parent 684e43f commit a9df958
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 17 deletions.
18 changes: 16 additions & 2 deletions src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::{
sources::util::{
http::HttpMethod,
http_client::{
build_url, call, default_interval, GenericHttpClientInputs, HttpClientBuilder,
build_url, call, default_interval, default_timeout, warn_if_interval_too_low,
GenericHttpClientInputs, HttpClientBuilder,
},
},
tls::{TlsConfig, TlsSettings},
Expand Down Expand Up @@ -51,13 +52,22 @@ pub struct HttpClientConfig {
#[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
pub endpoint: String,

/// The interval between calls.
/// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
/// than the interval a new scrape will be started. This can take extra resources, set the timeout
/// to a value lower than the scrape interval to prevent this from happening.
#[serde(default = "default_interval")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_interval_secs")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
pub interval: Duration,

/// The timeout for each scrape request.
#[serde(default = "default_timeout")]
#[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
#[serde(rename = "scrape_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Timeout"))]
pub timeout: Duration,

/// Custom parameters for the HTTP request query string.
///
/// One or more values for the same parameter key can be provided.
Expand Down Expand Up @@ -153,6 +163,7 @@ impl Default for HttpClientConfig {
endpoint: "http://localhost:9898/logs".to_string(),
query: HashMap::new(),
interval: default_interval(),
timeout: default_timeout(),
decoding: default_decoding(),
framing: default_framing_message_based(),
headers: HashMap::new(),
Expand Down Expand Up @@ -193,9 +204,12 @@ impl SourceConfig for HttpClientConfig {
log_namespace,
};

warn_if_interval_too_low(self.timeout, self.interval);

let inputs = GenericHttpClientInputs {
urls,
interval: self.interval,
timeout: self.timeout,
headers: self.headers.clone(),
content_type,
auth: self.auth.clone(),
Expand Down
13 changes: 12 additions & 1 deletion src/sources/http_client/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use codecs::decoding::DeserializerConfig;
use vector_core::config::log_schema;

use super::{
tests::{run_compliance, INTERVAL},
tests::{run_compliance, INTERVAL, TIMEOUT},
HttpClientConfig,
};

Expand Down Expand Up @@ -53,6 +53,7 @@ async fn invalid_endpoint() {
run_error(HttpClientConfig {
endpoint: "http://nope".to_string(),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: default_decoding(),
framing: default_framing_message_based(),
Expand All @@ -71,6 +72,7 @@ async fn collected_logs_bytes() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("{}/logs/bytes", dufs_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Bytes,
framing: default_framing_message_based(),
Expand All @@ -95,6 +97,7 @@ async fn collected_logs_json() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -119,6 +122,7 @@ async fn collected_metrics_native_json() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("{}/metrics/native.json", dufs_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::NativeJson(Default::default()),
framing: default_framing_message_based(),
Expand Down Expand Up @@ -148,6 +152,7 @@ async fn collected_trace_native_json() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("{}/traces/native.json", dufs_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::NativeJson(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -172,6 +177,7 @@ async fn unauthorized_no_auth() {
run_error(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_auth_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -190,6 +196,7 @@ async fn unauthorized_wrong_auth() {
run_error(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_auth_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -211,6 +218,7 @@ async fn authorized() {
run_compliance(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_auth_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -232,6 +240,7 @@ async fn tls_invalid_ca() {
run_error(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_https_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -253,6 +262,7 @@ async fn tls_valid() {
run_compliance(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_https_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -275,6 +285,7 @@ async fn shutdown() {
let source = HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand Down
8 changes: 8 additions & 0 deletions src/sources/http_client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::test_util::{

pub(crate) const INTERVAL: Duration = Duration::from_secs(1);

pub(crate) const TIMEOUT: Duration = Duration::from_secs(1);

/// The happy path should yield at least one event and must emit the required internal events for sources.
pub(crate) async fn run_compliance(config: HttpClientConfig) -> Vec<Event> {
let events =
Expand Down Expand Up @@ -47,6 +49,7 @@ async fn bytes_decoding() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: default_decoding(),
framing: default_framing_message_based(),
Expand Down Expand Up @@ -75,6 +78,7 @@ async fn json_decoding_newline_delimited() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: FramingConfig::NewlineDelimited(Default::default()),
Expand Down Expand Up @@ -103,6 +107,7 @@ async fn json_decoding_character_delimited() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
Expand Down Expand Up @@ -135,6 +140,7 @@ async fn request_query_applied() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint?key1=val1", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::from([
("key1".to_string(), vec!["val2".to_string()]),
(
Expand Down Expand Up @@ -203,6 +209,7 @@ async fn headers_applied() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: default_decoding(),
framing: default_framing_message_based(),
Expand Down Expand Up @@ -234,6 +241,7 @@ async fn accept_header_override() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Bytes,
framing: default_framing_message_based(),
Expand Down
23 changes: 22 additions & 1 deletion src/sources/prometheus/scrape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vector_core::{config::LogNamespace, event::Event};

use super::parser;
use crate::sources::util::http::HttpMethod;
use crate::sources::util::http_client::{default_timeout, warn_if_interval_too_low};
use crate::{
config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
http::Auth,
Expand Down Expand Up @@ -53,13 +54,22 @@ pub struct PrometheusScrapeConfig {
#[serde(alias = "hosts")]
endpoints: Vec<String>,

/// The interval between scrapes, in seconds.
/// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
/// than the interval a new scrape will be started. This can take extra resources, set the timeout
/// to a value lower than the scrape interval to prevent this from happening.
#[serde(default = "default_interval")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_interval_secs")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
interval: Duration,

/// The timeout for each scrape request.
#[serde(default = "default_timeout")]
#[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
#[serde(rename = "scrape_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Timeout"))]
timeout: Duration,

/// The tag name added to each event representing the scraped instance's `host:port`.
///
/// The tag value is the host and port of the scraped instance.
Expand Down Expand Up @@ -114,6 +124,7 @@ impl GenerateConfig for PrometheusScrapeConfig {
toml::Value::try_from(Self {
endpoints: vec!["http://localhost:9090/metrics".to_string()],
interval: default_interval(),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -143,9 +154,12 @@ impl SourceConfig for PrometheusScrapeConfig {
endpoint_tag: self.endpoint_tag.clone(),
};

warn_if_interval_too_low(self.timeout, self.interval);

let inputs = GenericHttpClientInputs {
urls,
interval: self.interval,
timeout: self.timeout,
headers: HashMap::new(),
content_type: "text/plain".to_string(),
auth: self.auth.clone(),
Expand Down Expand Up @@ -351,6 +365,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -384,6 +399,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -435,6 +451,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -500,6 +517,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -555,6 +573,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -668,6 +687,7 @@ mod test {
honor_labels: false,
query: HashMap::new(),
interval: Duration::from_secs(1),
timeout: default_timeout(),
tls: None,
auth: None,
},
Expand Down Expand Up @@ -753,6 +773,7 @@ mod integration_tests {
let config = PrometheusScrapeConfig {
endpoints: vec!["http://prometheus:9090/metrics".into()],
interval: Duration::from_secs(1),
timeout: Duration::from_secs(1),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down
Loading

0 comments on commit a9df958

Please sign in to comment.