From 1e85b5bf151ab4ec6f0cab140d79b038aaa02844 Mon Sep 17 00:00:00 2001 From: Alexander Rodin Date: Tue, 18 Feb 2020 00:42:28 +0300 Subject: [PATCH] enhancement(kafka source): Support advanced `librdkafka` options (#1830) Signed-off-by: Alexander Rodin --- .meta/sources/kafka.toml | 35 ++++++++ config/vector.spec.toml | 35 ++++++++ src/sources/kafka.rs | 37 ++++++-- website/docs/reference/sources/kafka.md | 107 +++++++++++++++++++++++- 4 files changed, 207 insertions(+), 7 deletions(-) diff --git a/.meta/sources/kafka.toml b/.meta/sources/kafka.toml index 87bf51b265087..1141a931dd9fc 100644 --- a/.meta/sources/kafka.toml +++ b/.meta/sources/kafka.toml @@ -70,6 +70,41 @@ description = """\ The Kafka session timeout in milliseconds. """ +[sources.kafka.options.socket_timeout_ms] +type = "int" +examples = [30000, 60000] +default = 60000 +unit = "milliseconds" +description = """\ +Default timeout for network requests. +""" + +[sources.kafka.options.fetch_wait_max_ms] +type = "int" +examples = [50, 100] +default = 100 +unit = "milliseconds" +description = """\ +Maximum time the broker may wait to fill the response. +""" + +[sources.kafka.options.librdkafka_options] +type = "table" +category = "Advanced" +description = """\ +Advanced consumer options. See [`librdkafka` documentation][urls.lib_rdkafka_config] for details. +""" + +[sources.kafka.options.librdkafka_options.children."`[field-name]`"] +type = "string" +examples = [ + {"client.id" = "${ENV_VAR}"}, + {"fetch.error.backoff.ms" = "1000"}, +] +description = """\ +The options and their values. Accepts `string` values. +""" + [sources.kafka.output.log.fields.message] type = "string" examples = ["Started GET / for 127.0.0.1 at 2012-03-10 14:28:14 +0100"] diff --git a/config/vector.spec.toml b/config/vector.spec.toml index 6a5d4ef807710..56ec5fee4f509 100644 --- a/config/vector.spec.toml +++ b/config/vector.spec.toml @@ -368,6 +368,10 @@ dns_servers = ["0.0.0.0:53"] # Ingests data through Kafka 0.9 or later and outputs `log` events. [sources.kafka] + # + # General + # + # The component type. This is a required field that tells Vector which # component to use. The value _must_ be `#{name}`. # @@ -411,6 +415,15 @@ dns_servers = ["0.0.0.0:53"] auto_offset_reset = "end" auto_offset_reset = "error" + # Maximum time the broker may wait to fill the response. + # + # * optional + # * default: 100 + # * type: int + # * unit: milliseconds + fetch_wait_max_ms = 50 + fetch_wait_max_ms = 100 + # The log field name to use for the topic key. If unspecified, the key would # not be added to the log event. If the message has null key, then this field # would not be added to the log event. @@ -429,6 +442,28 @@ dns_servers = ["0.0.0.0:53"] session_timeout_ms = 5000 session_timeout_ms = 10000 + # Default timeout for network requests. + # + # * optional + # * default: 60000 + # * type: int + # * unit: milliseconds + socket_timeout_ms = 30000 + socket_timeout_ms = 60000 + + # + # Advanced + # + + [sources.kafka.librdkafka_options] + # The options and their values. Accepts `string` values. + # + # * optional + # * no default + # * type: string + "client.id" = "${ENV_VAR}" + "fetch.error.backoff.ms" = "1000" + # Ingests data through the Heroku Logplex HTTP Drain protocol and outputs `log` events. [sources.logplex] # The component type. This is a required field that tells Vector which diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 5ea1a6da4ece6..8499c48ef1b26 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -13,7 +13,7 @@ use rdkafka::{ }; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; #[derive(Debug, Snafu)] enum BuildError { @@ -33,16 +33,29 @@ pub struct KafkaSourceConfig { auto_offset_reset: String, #[serde(default = "default_session_timeout_ms")] session_timeout_ms: u64, + #[serde(default = "default_socket_timeout_ms")] + socket_timeout_ms: u64, + #[serde(default = "default_fetch_wait_max_ms")] + fetch_wait_max_ms: u64, #[serde(default = "default_commit_interval_ms")] commit_interval_ms: u64, host_key: Option, key_field: Option, + librdkafka_options: Option>, } fn default_session_timeout_ms() -> u64 { 10000 // default in librdkafka } +fn default_socket_timeout_ms() -> u64 { + 60000 // default in librdkafka +} + +fn default_fetch_wait_max_ms() -> u64 { + 100 // default in librdkafka +} + fn default_commit_interval_ms() -> u64 { 5000 // default in librdkafka } @@ -130,11 +143,14 @@ fn kafka_source( } fn create_consumer(config: KafkaSourceConfig) -> crate::Result { - let consumer: StreamConsumer = ClientConfig::new() + let mut client_config = ClientConfig::new(); + client_config .set("group.id", &config.group_id) .set("bootstrap.servers", &config.bootstrap_servers) .set("auto.offset.reset", &config.auto_offset_reset) .set("session.timeout.ms", &config.session_timeout_ms.to_string()) + .set("socket.timeout.ms", &config.socket_timeout_ms.to_string()) + .set("fetch.wait.max.ms", &config.fetch_wait_max_ms.to_string()) .set("enable.partition.eof", "false") .set("enable.auto.commit", "true") .set( @@ -142,10 +158,15 @@ fn create_consumer(config: KafkaSourceConfig) -> crate::Result { &config.commit_interval_ms.to_string(), ) .set("enable.auto.offset.store", "false") - .set("client.id", "vector") - .create() - .context(KafkaCreateError)?; + .set("client.id", "vector"); + + if let Some(librdkafka_options) = config.librdkafka_options { + for (key, value) in librdkafka_options.into_iter() { + client_config.set(key.as_str(), value.as_str()); + } + } + let consumer: StreamConsumer = client_config.create().context(KafkaCreateError)?; let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect(); consumer.subscribe(&topics).context(KafkaSubscribeError)?; @@ -181,6 +202,9 @@ mod test { commit_interval_ms: 5000, host_key: None, key_field: Some("message_key".to_string()), + socket_timeout_ms: 60000, + fetch_wait_max_ms: 100, + librdkafka_options: None, } } @@ -249,6 +273,9 @@ mod integration_test { commit_interval_ms: 5000, host_key: None, key_field: Some("message_key".to_string()), + socket_timeout_ms: 60000, + fetch_wait_max_ms: 100, + librdkafka_options: None, }; let mut rt = runtime(); diff --git a/website/docs/reference/sources/kafka.md b/website/docs/reference/sources/kafka.md index 713ff7e7e5a55..40a9c83b962e1 100644 --- a/website/docs/reference/sources/kafka.md +++ b/website/docs/reference/sources/kafka.md @@ -61,16 +61,23 @@ import CodeHeader from '@site/src/components/CodeHeader'; ```toml [sources.my_source_id] - # REQUIRED + # REQUIRED - General type = "kafka" # must be: "kafka" bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092" # example group_id = "consumer-group-name" # example topics = ["^(prefix1|prefix2)-.+", "topic-1", "topic-2"] # example - # OPTIONAL + # OPTIONAL - General auto_offset_reset = "largest" # default + fetch_wait_max_ms = 100 # default, milliseconds key_field = "user_id" # example, no default session_timeout_ms = 10000 # default, milliseconds + socket_timeout_ms = 60000 # default, milliseconds + + # OPTIONAL - Advanced + [sources.my_source_id.librdkafka_options] + "client.id" = "${ENV_VAR}" # example + "fetch.error.backoff.ms" = "1000" # example ``` @@ -127,6 +134,29 @@ If offsets for consumer group do not exist, set them using this strategy. [librd A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself. + + + + + +### fetch_wait_max_ms + +Maximum time the broker may wait to fill the response. + + + @@ -175,6 +205,56 @@ The log field name to use for the topic key. If unspecified, the key would not b + + +### librdkafka_options + +Advanced consumer options. See [`librdkafka` documentation][urls.lib_rdkafka_config] for details. + + + + + + + +#### `[field-name]` + +The options and their values. Accepts `string` values. + + + + + + + + + + + + + + + +### socket_timeout_ms + +Default timeout for network requests. + + +