Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafka sink): performance improvements and fix memory leak #18634

Merged
merged 3 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 58 additions & 66 deletions src/sinks/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use crate::{
},
};

pub(crate) const QUEUED_MIN_MESSAGES: u64 = 100000;

/// Configuration for the `kafka` sink.
#[serde_as]
#[configurable_component(sink(
Expand Down Expand Up @@ -159,79 +157,73 @@ impl KafkaSinkConfig {

self.auth.apply(&mut client_config)?;

match kafka_role {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I only removed the KafkaRole::Consumer branch in this diff. The body of the KafkaRole::Producer branch remains unchanged.

// All batch options are producer only.
KafkaRole::Producer => {
client_config
.set("compression.codec", &to_string(self.compression))
.set(
"message.timeout.ms",
&self.message_timeout_ms.as_millis().to_string(),
);

if let Some(value) = self.batch.timeout_secs {
// Delay in milliseconds to wait for messages in the producer queue to accumulate before
// constructing message batches (MessageSets) to transmit to brokers. A higher value
// allows larger and more effective (less overhead, improved compression) batches of
// messages to accumulate at the expense of increased message delivery latency.
// Type: float
let key = "queue.buffering.max.ms";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{}={}`.\
// All batch options are producer only.
if kafka_role == KafkaRole::Producer {
client_config
.set("compression.codec", &to_string(self.compression))
.set(
"message.timeout.ms",
&self.message_timeout_ms.as_millis().to_string(),
);

if let Some(value) = self.batch.timeout_secs {
// Delay in milliseconds to wait for messages in the producer queue to accumulate before
// constructing message batches (MessageSets) to transmit to brokers. A higher value
// allows larger and more effective (less overhead, improved compression) batches of
// messages to accumulate at the expense of increased message delivery latency.
// Type: float
let key = "queue.buffering.max.ms";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.queue.buffering.max.ms={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "timeout_secs",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &((value * 1000.0).round().to_string()));
}
if let Some(value) = self.batch.max_events {
// Maximum number of messages batched in one MessageSet. The total MessageSet size is
// also limited by batch.size and message.max.bytes.
// Type: integer
let key = "batch.num.messages";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{}={}`.\
debug!(
librdkafka_option = key,
batch_option = "timeout_secs",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &((value * 1000.0).round().to_string()));
}
if let Some(value) = self.batch.max_events {
// Maximum number of messages batched in one MessageSet. The total MessageSet size is
// also limited by batch.size and message.max.bytes.
// Type: integer
let key = "batch.num.messages";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.batch.num.messages={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "max_events",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}
if let Some(value) = self.batch.max_bytes {
// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
// framing overhead. This limit is applied after the first message has been added to the
// batch, regardless of the first message's size, this is to ensure that messages that
// exceed batch.size are produced. The total MessageSet size is also limited by
// batch.num.messages and message.max.bytes.
// Type: integer
let key = "batch.size";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{}={}`.\
debug!(
librdkafka_option = key,
batch_option = "max_events",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}
if let Some(value) = self.batch.max_bytes {
// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
// framing overhead. This limit is applied after the first message has been added to the
// batch, regardless of the first message's size, this is to ensure that messages that
// exceed batch.size are produced. The total MessageSet size is also limited by
// batch.num.messages and message.max.bytes.
// Type: integer
let key = "batch.size";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.batch.size={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "max_bytes",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}
}

KafkaRole::Consumer => {
client_config.set("queued.min.messages", QUEUED_MIN_MESSAGES.to_string());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary as it is the default value (ref).

debug!(
librdkafka_option = key,
batch_option = "max_bytes",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}
}

Expand Down
77 changes: 38 additions & 39 deletions src/sinks/kafka/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,43 @@
use std::num::NonZeroUsize;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use lookup::OwnedTargetPath;
use rdkafka::message::{Header, OwnedHeaders};
use tokio_util::codec::Encoder as _;
use vrl::path::OwnedTargetPath;

use crate::{
codecs::{Encoder, Transformer},
event::{Event, Finalizable, Value},
internal_events::{KafkaHeaderExtractionError, TemplateRenderingError},
internal_events::KafkaHeaderExtractionError,
sinks::{
kafka::service::{KafkaRequest, KafkaRequestMetadata},
util::metadata::RequestMetadataBuilder,
prelude::*,
},
template::Template,
};

pub struct KafkaRequestBuilder {
pub key_field: Option<OwnedTargetPath>,
pub headers_key: Option<OwnedTargetPath>,
pub topic_template: Template,
pub transformer: Transformer,
pub encoder: Encoder<()>,
pub encoder: (Transformer, Encoder<()>),
}
Comment on lines 13 to 17
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified KafkaRequestBuilder to implement RequestBuilder rather than being a one-off implementation.


impl KafkaRequestBuilder {
pub fn build_request(&mut self, mut event: Event) -> Option<KafkaRequest> {
let topic = self
.topic_template
.render_string(&event)
.map_err(|error| {
emit!(TemplateRenderingError {
field: None,
drop_event: true,
error,
});
})
.ok()?;
impl RequestBuilder<(String, Event)> for KafkaRequestBuilder {
type Metadata = KafkaRequestMetadata;
type Events = Event;
type Encoder = (Transformer, Encoder<()>);
type Payload = Bytes;
type Request = KafkaRequest;
type Error = std::io::Error;

fn compression(&self) -> Compression {
Compression::None
}

fn encoder(&self) -> &Self::Encoder {
&self.encoder
}

fn split_input(
&self,
input: (String, Event),
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let (topic, mut event) = input;
let builder = RequestMetadataBuilder::from_event(&event);

let metadata = KafkaRequestMetadata {
finalizers: event.take_finalizers(),
Expand All @@ -45,23 +46,21 @@ impl KafkaRequestBuilder {
headers: get_headers(&event, self.headers_key.as_ref()),
topic,
};
self.transformer.transform(&mut event);
let mut body = BytesMut::new();

// Ensure the metadata builder is built after transforming the event so we have the event
// size taking into account any dropped fields.
let metadata_builder = RequestMetadataBuilder::from_event(&event);
self.encoder.encode(event, &mut body).ok()?;
let body = body.freeze();

let bytes_len = NonZeroUsize::new(body.len()).expect("payload should never be zero length");
let request_metadata = metadata_builder.with_request_size(bytes_len);
(metadata, builder, event)
}

Some(KafkaRequest {
body,
fn build_request(
&self,
metadata: Self::Metadata,
request_metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
KafkaRequest {
body: payload.into_payload(),
metadata,
request_metadata,
})
}
}
}

Expand Down
28 changes: 15 additions & 13 deletions src/sinks/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ use rdkafka::{
producer::{FutureProducer, FutureRecord},
util::Timeout,
};
use vector_core::internal_event::{
ByteSize, BytesSent, InternalEventHandle as _, Protocol, Registered,
};

use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*};

Expand All @@ -29,6 +26,7 @@ pub struct KafkaRequestMetadata {

pub struct KafkaResponse {
event_byte_size: GroupedCountByteSize,
raw_byte_size: usize,
}
Comment on lines 27 to 30
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delegated the bytes sent logic into the driver, rather than managing it here.


impl DriverResponse for KafkaResponse {
Expand All @@ -39,6 +37,10 @@ impl DriverResponse for KafkaResponse {
fn events_sent(&self) -> &GroupedCountByteSize {
&self.event_byte_size
}

fn bytes_sent(&self) -> Option<usize> {
Some(self.raw_byte_size)
}
}

impl Finalizable for KafkaRequest {
Expand All @@ -60,15 +62,13 @@ impl MetaDescriptive for KafkaRequest {
#[derive(Clone)]
pub struct KafkaService {
kafka_producer: FutureProducer<KafkaStatisticsContext>,
bytes_sent: Registered<BytesSent>,
}

impl KafkaService {
pub(crate) fn new(kafka_producer: FutureProducer<KafkaStatisticsContext>) -> KafkaService {
KafkaService {
kafka_producer,
bytes_sent: register!(BytesSent::from(Protocol("kafka".into()))),
}
pub(crate) const fn new(
kafka_producer: FutureProducer<KafkaStatisticsContext>,
) -> KafkaService {
KafkaService { kafka_producer }
}
}

Expand Down Expand Up @@ -104,10 +104,12 @@ impl Service<KafkaRequest> for KafkaService {
// rdkafka will internally retry forever if the queue is full
match this.kafka_producer.send(record, Timeout::Never).await {
Ok((_partition, _offset)) => {
this.bytes_sent.emit(ByteSize(
request.body.len() + request.metadata.key.map(|x| x.len()).unwrap_or(0),
));
Ok(KafkaResponse { event_byte_size })
let raw_byte_size =
request.body.len() + request.metadata.key.map_or(0, |x| x.len());
Ok(KafkaResponse {
event_byte_size,
raw_byte_size,
})
}
Err((kafka_err, _original_record)) => Err(kafka_err),
}
Expand Down
52 changes: 38 additions & 14 deletions src/sinks/kafka/sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::future;
use std::num::NonZeroUsize;

use rdkafka::{
consumer::{BaseConsumer, Consumer},
error::KafkaError,
Expand All @@ -13,9 +14,7 @@ use vrl::path::OwnedTargetPath;
use super::config::{KafkaRole, KafkaSinkConfig};
use crate::{
kafka::KafkaStatisticsContext,
sinks::kafka::{
config::QUEUED_MIN_MESSAGES, request_builder::KafkaRequestBuilder, service::KafkaService,
},
sinks::kafka::{request_builder::KafkaRequestBuilder, service::KafkaService},
sinks::prelude::*,
};

Expand Down Expand Up @@ -65,22 +64,47 @@ impl KafkaSink {
}

async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
// rdkafka will internally retry forever, so we need some limit to prevent this from overflowing
let service = ConcurrencyLimit::new(self.service, QUEUED_MIN_MESSAGES as usize);
let mut request_builder = KafkaRequestBuilder {
// rdkafka will internally retry forever, so we need some limit to prevent this from overflowing.
// 64 should be plenty concurrency here, as a rdkafka send operation does not block until its underlying
// buffer is full.
let service = ConcurrencyLimit::new(self.service.clone(), 64);
let builder_limit = NonZeroUsize::new(64);

let request_builder = KafkaRequestBuilder {
key_field: self.key_field,
headers_key: self.headers_key,
topic_template: self.topic,
transformer: self.transformer,
encoder: self.encoder,
encoder: (self.transformer, self.encoder),
};

input
.filter_map(|event|
// request_builder is fallible but the places it can fail are emitting
// `Error` and `DroppedEvent` internal events appropriately so no need to here.
future::ready(request_builder.build_request(event)))
.filter_map(|event| {
// Compute the topic.
future::ready(
self.topic
.render_string(&event)
.map_err(|error| {
emit!(TemplateRenderingError {
field: None,
drop_event: true,
error,
});
})
.ok()
.map(|topic| (topic, event)),
)
Comment on lines +81 to +94
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted this from the request builder because the builder methods are infallible.

})
.request_builder(builder_limit, request_builder)
.filter_map(|request| async {
match request {
Err(error) => {
emit!(SinkRequestBuildError { error });
None
}
Ok(req) => Some(req),
}
})
.into_driver(service)
.protocol("kafka")
.run()
.await
}
Expand Down