Skip to content

Commit

Permalink
enhancement(kafka source): Support advanced librdkafka options (#1830)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Rodin <rodin.alexander@gmail.com>
  • Loading branch information
a-rodin authored Feb 17, 2020
1 parent 3210595 commit 1e85b5b
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 7 deletions.
35 changes: 35 additions & 0 deletions .meta/sources/kafka.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,41 @@ description = """\
The Kafka session timeout in milliseconds.
"""

[sources.kafka.options.socket_timeout_ms]
type = "int"
examples = [30000, 60000]
default = 60000
unit = "milliseconds"
description = """\
Default timeout for network requests.
"""

[sources.kafka.options.fetch_wait_max_ms]
type = "int"
examples = [50, 100]
default = 100
unit = "milliseconds"
description = """\
Maximum time the broker may wait to fill the response.
"""

[sources.kafka.options.librdkafka_options]
type = "table"
category = "Advanced"
description = """\
Advanced consumer options. See [`librdkafka` documentation][urls.lib_rdkafka_config] for details.
"""

[sources.kafka.options.librdkafka_options.children."`[field-name]`"]
type = "string"
examples = [
{"client.id" = "${ENV_VAR}"},
{"fetch.error.backoff.ms" = "1000"},
]
description = """\
The options and their values. Accepts `string` values.
"""

[sources.kafka.output.log.fields.message]
type = "string"
examples = ["Started GET / for 127.0.0.1 at 2012-03-10 14:28:14 +0100"]
Expand Down
35 changes: 35 additions & 0 deletions config/vector.spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ dns_servers = ["0.0.0.0:53"]

# Ingests data through Kafka 0.9 or later and outputs `log` events.
[sources.kafka]
#
# General
#

# The component type. This is a required field that tells Vector which
# component to use. The value _must_ be `#{name}`.
#
Expand Down Expand Up @@ -411,6 +415,15 @@ dns_servers = ["0.0.0.0:53"]
auto_offset_reset = "end"
auto_offset_reset = "error"

# Maximum time the broker may wait to fill the response.
#
# * optional
# * default: 100
# * type: int
# * unit: milliseconds
fetch_wait_max_ms = 50
fetch_wait_max_ms = 100

# The log field name to use for the topic key. If unspecified, the key would
# not be added to the log event. If the message has null key, then this field
# would not be added to the log event.
Expand All @@ -429,6 +442,28 @@ dns_servers = ["0.0.0.0:53"]
session_timeout_ms = 5000
session_timeout_ms = 10000

# Default timeout for network requests.
#
# * optional
# * default: 60000
# * type: int
# * unit: milliseconds
socket_timeout_ms = 30000
socket_timeout_ms = 60000

#
# Advanced
#

[sources.kafka.librdkafka_options]
# The options and their values. Accepts `string` values.
#
# * optional
# * no default
# * type: string
"client.id" = "${ENV_VAR}"
"fetch.error.backoff.ms" = "1000"

# Ingests data through the Heroku Logplex HTTP Drain protocol and outputs `log` events.
[sources.logplex]
# The component type. This is a required field that tells Vector which
Expand Down
37 changes: 32 additions & 5 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rdkafka::{
};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

#[derive(Debug, Snafu)]
enum BuildError {
Expand All @@ -33,16 +33,29 @@ pub struct KafkaSourceConfig {
auto_offset_reset: String,
#[serde(default = "default_session_timeout_ms")]
session_timeout_ms: u64,
#[serde(default = "default_socket_timeout_ms")]
socket_timeout_ms: u64,
#[serde(default = "default_fetch_wait_max_ms")]
fetch_wait_max_ms: u64,
#[serde(default = "default_commit_interval_ms")]
commit_interval_ms: u64,
host_key: Option<String>,
key_field: Option<String>,
librdkafka_options: Option<HashMap<String, String>>,
}

fn default_session_timeout_ms() -> u64 {
10000 // default in librdkafka
}

fn default_socket_timeout_ms() -> u64 {
60000 // default in librdkafka
}

fn default_fetch_wait_max_ms() -> u64 {
100 // default in librdkafka
}

fn default_commit_interval_ms() -> u64 {
5000 // default in librdkafka
}
Expand Down Expand Up @@ -130,22 +143,30 @@ fn kafka_source(
}

fn create_consumer(config: KafkaSourceConfig) -> crate::Result<StreamConsumer> {
let consumer: StreamConsumer = ClientConfig::new()
let mut client_config = ClientConfig::new();
client_config
.set("group.id", &config.group_id)
.set("bootstrap.servers", &config.bootstrap_servers)
.set("auto.offset.reset", &config.auto_offset_reset)
.set("session.timeout.ms", &config.session_timeout_ms.to_string())
.set("socket.timeout.ms", &config.socket_timeout_ms.to_string())
.set("fetch.wait.max.ms", &config.fetch_wait_max_ms.to_string())
.set("enable.partition.eof", "false")
.set("enable.auto.commit", "true")
.set(
"auto.commit.interval.ms",
&config.commit_interval_ms.to_string(),
)
.set("enable.auto.offset.store", "false")
.set("client.id", "vector")
.create()
.context(KafkaCreateError)?;
.set("client.id", "vector");

if let Some(librdkafka_options) = config.librdkafka_options {
for (key, value) in librdkafka_options.into_iter() {
client_config.set(key.as_str(), value.as_str());
}
}

let consumer: StreamConsumer = client_config.create().context(KafkaCreateError)?;
let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
consumer.subscribe(&topics).context(KafkaSubscribeError)?;

Expand Down Expand Up @@ -181,6 +202,9 @@ mod test {
commit_interval_ms: 5000,
host_key: None,
key_field: Some("message_key".to_string()),
socket_timeout_ms: 60000,
fetch_wait_max_ms: 100,
librdkafka_options: None,
}
}

Expand Down Expand Up @@ -249,6 +273,9 @@ mod integration_test {
commit_interval_ms: 5000,
host_key: None,
key_field: Some("message_key".to_string()),
socket_timeout_ms: 60000,
fetch_wait_max_ms: 100,
librdkafka_options: None,
};

let mut rt = runtime();
Expand Down
107 changes: 105 additions & 2 deletions website/docs/reference/sources/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,23 @@ import CodeHeader from '@site/src/components/CodeHeader';

```toml
[sources.my_source_id]
# REQUIRED
# REQUIRED - General
type = "kafka" # must be: "kafka"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092" # example
group_id = "consumer-group-name" # example
topics = ["^(prefix1|prefix2)-.+", "topic-1", "topic-2"] # example

# OPTIONAL
# OPTIONAL - General
auto_offset_reset = "largest" # default
fetch_wait_max_ms = 100 # default, milliseconds
key_field = "user_id" # example, no default
session_timeout_ms = 10000 # default, milliseconds
socket_timeout_ms = 60000 # default, milliseconds

# OPTIONAL - Advanced
[sources.my_source_id.librdkafka_options]
"client.id" = "${ENV_VAR}" # example
"fetch.error.backoff.ms" = "1000" # example
```

</TabItem>
Expand Down Expand Up @@ -127,6 +134,29 @@ If offsets for consumer group do not exist, set them using this strategy. [librd
A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.


</Field>


<Field
common={false}
defaultValue={100}
enumValues={null}
examples={[50,100]}
name={"fetch_wait_max_ms"}
path={null}
relevantWhen={null}
required={false}
templateable={false}
type={"int"}
unit={"milliseconds"}
>
### fetch_wait_max_ms

Maximum time the broker may wait to fill the response.



</Field>


Expand Down Expand Up @@ -175,6 +205,56 @@ The log field name to use for the topic key. If unspecified, the key would not b
</Field>


<Field
common={false}
defaultValue={null}
enumValues={null}
examples={[]}
name={"librdkafka_options"}
path={null}
relevantWhen={null}
required={false}
templateable={false}
type={"table"}
unit={null}
>
### librdkafka_options

Advanced consumer options. See [`librdkafka` documentation][urls.lib_rdkafka_config] for details.


<Fields filters={false}>


<Field
common={false}
defaultValue={null}
enumValues={null}
examples={[{"client.id":"${ENV_VAR}"},{"fetch.error.backoff.ms":"1000"}]}
name={"`[field-name]`"}
path={"librdkafka_options"}
relevantWhen={null}
required={false}
templateable={false}
type={"string"}
unit={null}
>
#### `[field-name]`

The options and their values. Accepts `string` values.



</Field>


</Fields>

</Field>


<Field
common={false}
defaultValue={10000}
Expand All @@ -195,6 +275,29 @@ The Kafka session timeout in milliseconds.



</Field>


<Field
common={false}
defaultValue={60000}
enumValues={null}
examples={[30000,60000]}
name={"socket_timeout_ms"}
path={null}
relevantWhen={null}
required={false}
templateable={false}
type={"int"}
unit={"milliseconds"}
>
### socket_timeout_ms

Default timeout for network requests.



</Field>


Expand Down

0 comments on commit 1e85b5b

Please sign in to comment.