diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 91eb05c5ed3994..6bcba0de51682c 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -1,7 +1,11 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, io::Cursor, - sync::{Arc, OnceLock}, + pin::Pin, + sync::{ + mpsc::{sync_channel, SyncSender}, + Arc, OnceLock, Weak, + }, time::Duration, }; @@ -13,14 +17,29 @@ use codecs::{ StreamDecodingError, }; use futures::{Stream, StreamExt}; +use futures_util::future::OptionFuture; use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath}; use rdkafka::{ - consumer::{CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer}, + consumer::{ + stream_consumer::StreamPartitionQueue, CommitMode, Consumer, ConsumerContext, Rebalance, + StreamConsumer, + }, + error::KafkaError, message::{BorrowedMessage, Headers as _, Message}, - ClientConfig, ClientContext, Statistics, + types::RDKafkaErrorCode, + ClientConfig, ClientContext, Statistics, TopicPartitionList, }; use serde_with::serde_as; use snafu::{ResultExt, Snafu}; +use tokio::{ + runtime::Handle, + sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + oneshot, + }, + task::JoinSet, + time::Sleep, +}; use tokio_util::codec::FramedRead; use vector_common::finalizer::OrderedFinalizer; @@ -50,10 +69,15 @@ use crate::{ #[derive(Debug, Snafu)] enum BuildError { + #[snafu(display("The drain_timeout_ms ({}) must be less than session_timeout_ms ({})", value, session_timeout_ms.as_millis()))] + InvalidDrainTimeout { + value: u64, + session_timeout_ms: Duration, + }, #[snafu(display("Could not create Kafka consumer: {}", source))] - KafkaCreateError { source: rdkafka::error::KafkaError }, + CreateError { source: rdkafka::error::KafkaError }, #[snafu(display("Could not subscribe to Kafka topics: {}", source))] - KafkaSubscribeError { source: rdkafka::error::KafkaError }, + SubscribeError { source: rdkafka::error::KafkaError }, } /// Metrics configuration. @@ -109,6 +133,21 @@ pub struct KafkaSourceConfig { #[configurable(metadata(docs::human_name = "Session Timeout"))] session_timeout_ms: Duration, + /// Timeout to drain pending acknowledgements during shutdown or a Kafka + /// consumer group rebalance. + /// + /// When Vector shuts down or the Kafka consumer group revokes partitions from this + /// consumer, wait a maximum of `drain_timeout_ms` for the source to + /// process pending acknowledgements. Must be less than `session_timeout_ms` + /// to ensure the consumer is not excluded from the group during a rebalance. + /// + /// Default value is half of `session_timeout_ms`. + #[serde(skip_serializing_if = "Option::is_none")] + #[configurable(metadata(docs::examples = 2500, docs::examples = 5000))] + #[configurable(metadata(docs::advanced))] + #[configurable(metadata(docs::human_name = "Drain Timeout"))] + drain_timeout_ms: Option, + /// Timeout for network requests. #[serde_as(as = "serde_with::DurationMilliSeconds")] #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))] @@ -289,19 +328,31 @@ impl SourceConfig for KafkaSourceConfig { async fn build(&self, cx: SourceContext) -> crate::Result { let log_namespace = cx.log_namespace(self.log_namespace); - let consumer = create_consumer(self)?; let decoder = DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace) .build()?; let acknowledgements = cx.do_acknowledgements(self.acknowledgements); + if let Some(d) = self.drain_timeout_ms { + snafu::ensure!( + Duration::from_millis(d) <= self.session_timeout_ms, + InvalidDrainTimeoutSnafu { + value: d, + session_timeout_ms: self.session_timeout_ms + } + ); + } + + let (consumer, callback_rx) = create_consumer(self, acknowledgements)?; + Ok(Box::pin(kafka_source( self.clone(), consumer, + callback_rx, decoder, - cx.shutdown, cx.out, - acknowledgements, + cx.shutdown, + false, log_namespace, ))) } @@ -368,107 +419,554 @@ impl SourceConfig for KafkaSourceConfig { } } +#[allow(clippy::too_many_arguments)] async fn kafka_source( config: KafkaSourceConfig, - consumer: StreamConsumer, + consumer: StreamConsumer, + callback_rx: UnboundedReceiver, decoder: Decoder, - mut shutdown: ShutdownSignal, - mut out: SourceSender, - acknowledgements: bool, + out: SourceSender, + shutdown: ShutdownSignal, + eof: bool, log_namespace: LogNamespace, ) -> Result<(), ()> { + let span = info_span!("kafka_source"); let consumer = Arc::new(consumer); - let (finalizer, mut ack_stream) = - OrderedFinalizer::::maybe_new(acknowledgements, Some(shutdown.clone())); - let finalizer = finalizer.map(Arc::new); - if let Some(finalizer) = &finalizer { - consumer - .context() - .finalizer - .set(Arc::clone(finalizer)) - .expect("Finalizer is only set once"); + + consumer + .context() + .consumer + .set(Arc::downgrade(&consumer)) + .expect("Error setting up consumer context."); + + // EOF signal allowing the coordination task to tell the kafka client task when all partitions have reached EOF + let (eof_tx, eof_rx) = eof.then(oneshot::channel::<()>).unzip(); + + let coordination_task = { + let span = span.clone(); + let consumer = Arc::clone(&consumer); + let drain_timeout_ms = config + .drain_timeout_ms + .map_or(config.session_timeout_ms / 2, Duration::from_millis); + let consumer_state = + ConsumerStateInner::::new(config, decoder, out, log_namespace); + tokio::spawn(async move { + let _enter = span.enter(); + coordinate_kafka_callbacks( + consumer, + callback_rx, + consumer_state, + drain_timeout_ms, + eof_tx, + ) + .await; + }) + }; + + let client_task = { + let consumer = Arc::clone(&consumer); + tokio::task::spawn_blocking(move || { + let _enter = span.enter(); + drive_kafka_consumer(consumer, shutdown, eof_rx); + }) + }; + + _ = tokio::join!(client_task, coordination_task); + consumer.context().commit_consumer_state(); + + Ok(()) +} + +/// ConsumerStateInner implements a small struct/enum-based state machine. +/// +/// With a ConsumerStateInner, the client is able to spawn new tasks +/// when partitions are assigned. When a shutdown signal is received, or +/// partitions are being revoked, the Consuming state is traded for a Draining +/// state (and associated drain deadline future) via the `begin_drain` method +/// +/// A ConsumerStateInner keeps track of partitions that are expected +/// to complete, and also owns the signal that, when dropped, indicates to the +/// client driver task that it is safe to proceed with the rebalance or shutdown. +/// When draining is complete, or the deadline is reached, Draining is traded in for +/// either a Consuming (after a revoke) or Complete (in the case of shutdown) state, +/// via the `finish_drain` method. +/// +/// A ConsumerStateInner is the final state, reached after a shutdown +/// signal is received. This can not be traded for another state, and the +/// coordination task should exit when this state is reached. +struct ConsumerStateInner { + config: KafkaSourceConfig, + decoder: Decoder, + out: SourceSender, + log_namespace: LogNamespace, + consumer_state: S, +} +struct Consuming; +struct Complete; +struct Draining { + /// The rendezvous channel sender from the revoke or shutdown callback. Sending on this channel + /// indicates to the kafka client task that one or more partitions have been drained, while + /// closing this channel indicates that all expected partitions have drained, or the drain + /// timeout has been reached. + signal: SyncSender<()>, + + /// The set of topic-partition tasks that are required to complete during + /// the draining phase, populated at the beginning of a rebalance or shutdown. + /// Partitions that are being revoked, but not being actively consumed + /// (e.g. due to the consumer task exiting early) should not be included. + /// The draining phase is considered complete when this set is empty. + expect_drain: HashSet, + + /// Whether the client is shutting down after draining. If set to true, + /// the `finish_drain` method will return a Complete state, otherwise + /// a Consuming state. + shutdown: bool, +} +type OptionDeadline = OptionFuture>>; +enum ConsumerState { + Consuming(ConsumerStateInner), + Draining(ConsumerStateInner), + Complete(ConsumerStateInner), +} +impl Draining { + fn new(signal: SyncSender<()>, shutdown: bool) -> Self { + Self { + signal, + shutdown, + expect_drain: HashSet::new(), + } } - let mut stream = consumer.stream(); + fn is_complete(&self) -> bool { + self.expect_drain.is_empty() + } +} - loop { - tokio::select! { - biased; - _ = &mut shutdown => break, - entry = ack_stream.next() => if let Some((status, entry)) = entry { - if status == BatchStatus::Delivered { - if let Err(error) = - consumer.store_offset(&entry.topic, entry.partition, entry.offset) - { - emit!(KafkaOffsetUpdateError { error }); +impl ConsumerStateInner { + fn complete(self, _deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) { + ( + None.into(), + ConsumerState::Complete(ConsumerStateInner { + config: self.config, + decoder: self.decoder, + out: self.out, + log_namespace: self.log_namespace, + consumer_state: Complete, + }), + ) + } +} + +impl ConsumerStateInner { + const fn new( + config: KafkaSourceConfig, + decoder: Decoder, + out: SourceSender, + log_namespace: LogNamespace, + ) -> Self { + Self { + config, + decoder, + out, + log_namespace, + consumer_state: Consuming, + } + } + + /// Spawn a task on the provided JoinSet to consume the kafka StreamPartitionQueue, and handle acknowledgements for the messages consumed + /// Returns a channel sender that can be used to signal that the consumer should stop and drain pending acknowledgements, + /// and an AbortHandle that can be used to forcefully end the task. + fn consume_partition( + &self, + join_set: &mut JoinSet<(TopicPartition, PartitionConsumerStatus)>, + tp: TopicPartition, + consumer: Arc>, + p: StreamPartitionQueue, + acknowledgements: bool, + exit_eof: bool, + ) -> (oneshot::Sender<()>, tokio::task::AbortHandle) { + let keys = self.config.keys(); + let decoder = self.decoder.clone(); + let log_namespace = self.log_namespace; + let mut out = self.out.clone(); + + let (end_tx, mut end_signal) = oneshot::channel::<()>(); + + let handle = join_set.spawn(async move { + let mut messages = p.stream(); + let (finalizer, mut ack_stream) = OrderedFinalizer::::new(None); + + // finalizer is the entry point for new pending acknowledgements; + // when it is dropped, no new messages will be consumed, and the + // task will end when it reaches the end of ack_stream + let mut finalizer = Some(finalizer); + + let mut status = PartitionConsumerStatus::NormalExit; + + loop { + tokio::select!( + // is_some() checks prevent polling end_signal after it completes + _ = &mut end_signal, if finalizer.is_some() => { + finalizer.take(); + }, + message = messages.next(), if finalizer.is_some() => match message { + None => unreachable!("MessageStream never calls Ready(None)"), + Some(Err(error)) => match error { + rdkafka::error::KafkaError::PartitionEOF(partition) if exit_eof => { + debug!("EOF for partition {}.", partition); + status = PartitionConsumerStatus::PartitionEOF; + finalizer.take(); + }, + _ => emit!(KafkaReadError { error }), + }, + Some(Ok(msg)) => { + emit!(KafkaBytesReceived { + byte_size: msg.payload_len(), + protocol: "tcp", + topic: msg.topic(), + partition: msg.partition(), + }); + parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await; + } + }, + + ack = ack_stream.next() => match ack { + Some((status, entry)) => { + if status == BatchStatus::Delivered { + if let Err(error) = consumer.store_offset(&entry.topic, entry.partition, entry.offset) { + emit!(KafkaOffsetUpdateError { error }); + } + } + } + None if finalizer.is_none() => { + debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1); + break + } + None => { + debug!("Acknowledgement stream empty for {}:{}", &tp.0, tp.1); + } } + ) + } + (tp, status) + }); + (end_tx, handle) + } + + /// Consume self, and return a "Draining" ConsumerState, along with a Future + /// representing a drain deadline, based on max_drain_ms + fn begin_drain( + self, + max_drain_ms: Duration, + sig: SyncSender<()>, + shutdown: bool, + ) -> (OptionDeadline, ConsumerStateInner) { + let deadline = Box::pin(tokio::time::sleep(max_drain_ms)); + + let draining = ConsumerStateInner { + config: self.config, + decoder: self.decoder, + out: self.out, + log_namespace: self.log_namespace, + consumer_state: Draining::new(sig, shutdown), + }; + + (Some(deadline).into(), draining) + } + + pub const fn keep_consuming(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) { + (deadline, ConsumerState::Consuming(self)) + } +} + +impl ConsumerStateInner { + /// Mark the given TopicPartition as being revoked, adding it to the set of + /// partitions expected to drain + fn revoke_partition(&mut self, tp: TopicPartition, end_signal: oneshot::Sender<()>) { + // Note that if this send() returns Err, it means the task has already + // ended, but the completion has not been processed yet (otherwise we wouldn't have access to the end_signal), + // so we should still add it to the "expect to drain" set + _ = end_signal.send(()); + self.consumer_state.expect_drain.insert(tp); + } + + /// Add the given TopicPartition to the set of known "drained" partitions, + /// i.e. the consumer has drained the acknowledgement channel. A signal is + /// sent on the signal channel, indicating to the client that offsets may be committed + fn partition_drained(&mut self, tp: TopicPartition) { + // This send() will only return Err if the receiver has already been disconnected (i.e. the + // kafka client task is no longer running) + _ = self.consumer_state.signal.send(()); + self.consumer_state.expect_drain.remove(&tp); + } + + /// Return true if all expected partitions have drained + fn is_drain_complete(&self) -> bool { + self.consumer_state.is_complete() + } + + /// Finish partition drain mode. Consumes self and the drain deadline + /// future, and returns a "Consuming" or "Complete" ConsumerState + fn finish_drain(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) { + if self.consumer_state.shutdown { + self.complete(deadline) + } else { + ( + None.into(), + ConsumerState::Consuming(ConsumerStateInner { + config: self.config, + decoder: self.decoder, + out: self.out, + log_namespace: self.log_namespace, + consumer_state: Consuming, + }), + ) + } + } + + pub const fn keep_draining(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) { + (deadline, ConsumerState::Draining(self)) + } +} + +async fn coordinate_kafka_callbacks( + consumer: Arc>, + mut callbacks: UnboundedReceiver, + consumer_state: ConsumerStateInner, + max_drain_ms: Duration, + mut eof: Option>, +) { + let mut drain_deadline: OptionFuture<_> = None.into(); + let mut consumer_state = ConsumerState::Consuming(consumer_state); + + // A oneshot channel is used for each consumed partition, so that we can + // signal to that task to stop consuming, drain pending acks, and exit + let mut end_signals: HashMap> = HashMap::new(); + + // The set of consumer tasks, each consuming a specific partition. The task + // is both consuming the messages (passing them to the output stream) _and_ + // processing the corresponding acknowledgement stream. A consumer task + // should completely drain its acknowledgement stream after receiving an end signal + let mut partition_consumers: JoinSet<(TopicPartition, PartitionConsumerStatus)> = + Default::default(); + + // Handles that will let us end any consumer task that exceeds a drain deadline + let mut abort_handles: HashMap = HashMap::new(); + + let exit_eof = eof.is_some(); + + while let ConsumerState::Consuming(_) | ConsumerState::Draining(_) = consumer_state { + tokio::select! { + Some(Ok((finished_partition, status))) = partition_consumers.join_next(), if !partition_consumers.is_empty() => { + debug!("Partition consumer finished for {}:{}", &finished_partition.0, finished_partition.1); + // If this task ended on its own, the end_signal for it will still be in here. + end_signals.remove(&finished_partition); + abort_handles.remove(&finished_partition); + + (drain_deadline, consumer_state) = match consumer_state { + ConsumerState::Complete(_) => unreachable!("Partition consumer finished after completion."), + ConsumerState::Draining(mut state) => { + state.partition_drained(finished_partition); + + if state.is_drain_complete() { + debug!("All expected partitions have drained."); + state.finish_drain(drain_deadline) + } else { + state.keep_draining(drain_deadline) + } + }, + ConsumerState::Consuming(state) => { + // If we are here, it is likely because the consumer + // tasks are set up to exit upon reaching the end of the + // partition. + if !exit_eof { + debug!("Partition consumer task finished, while not in draining mode."); + } + state.keep_consuming(drain_deadline) + }, + }; + + // PartitionConsumerStatus differentiates between a task that exited after + // being signaled to end, and one that reached the end of its partition and + // was configured to exit. After the last such task ends, we signal the kafka + // driver task to shut down the main consumer too. Note this is only used in tests. + if exit_eof && status == PartitionConsumerStatus::PartitionEOF && partition_consumers.is_empty() { + debug!("All partitions have exited or reached EOF."); + let _ = eof.take().map(|e| e.send(())); } }, - message = stream.next() => match message { - None => break, // WHY? - Some(Err(error)) => emit!(KafkaReadError { error }), - Some(Ok(msg)) => { - emit!(KafkaBytesReceived { - byte_size: msg.payload_len(), - protocol: "tcp", - topic: msg.topic(), - partition: msg.partition(), - }); + Some(callback) = callbacks.recv() => match callback { + KafkaCallback::PartitionsAssigned(mut assigned_partitions, done) => match consumer_state { + ConsumerState::Complete(_) => unreachable!("Partition assignment received after completion."), + ConsumerState::Draining(_) => error!("Partition assignment received while draining revoked partitions, maybe an invalid assignment."), + ConsumerState::Consuming(ref consumer_state) => { + let acks = consumer.context().acknowledgements; + for tp in assigned_partitions.drain(0..) { + let topic = tp.0.as_str(); + let partition = tp.1; + if let Some(pq) = consumer.split_partition_queue(topic, partition) { + debug!("Consuming partition {}:{}.", &tp.0, tp.1); + let (end_tx, handle) = consumer_state.consume_partition(&mut partition_consumers, tp.clone(), Arc::clone(&consumer), pq, acks, exit_eof); + abort_handles.insert(tp.clone(), handle); + end_signals.insert(tp, end_tx); + } else { + warn!("Failed to get queue for assigned partition {}:{}.", &tp.0, tp.1); + } + } + // ensure this is retained until all individual queues are set up + drop(done); + } + }, + KafkaCallback::PartitionsRevoked(mut revoked_partitions, drain) => (drain_deadline, consumer_state) = match consumer_state { + ConsumerState::Complete(_) => unreachable!("Partitions revoked after completion."), + ConsumerState::Draining(d) => { + // NB: This would only happen if the task driving the kafka client (i.e. rebalance handlers) + // is not handling shutdown signals, and a revoke happens during a shutdown drain; otherwise + // this is unreachable code. + warn!("Kafka client is already draining revoked partitions."); + d.keep_draining(drain_deadline) + }, + ConsumerState::Consuming(state) => { + let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, false); + + for tp in revoked_partitions.drain(0..) { + if let Some(end) = end_signals.remove(&tp) { + debug!("Revoking partition {}:{}", &tp.0, tp.1); + state.revoke_partition(tp, end); + } else { + debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1); + } + } + + state.keep_draining(deadline) + } + }, + KafkaCallback::ShuttingDown(drain) => (drain_deadline, consumer_state) = match consumer_state { + ConsumerState::Complete(_) => unreachable!("Shutdown received after completion."), + // Shutting down is just like a full assignment revoke, but we also close the + // callback channels, since we don't expect additional assignments or rebalances + ConsumerState::Draining(state) => { + // NB: This would only happen if the task driving the kafka client is + // not handling shutdown signals; otherwise this is unreachable code + error!("Kafka client handled a shutdown signal while a rebalance was in progress."); + callbacks.close(); + state.keep_draining(drain_deadline) + }, + ConsumerState::Consuming(state) => { + callbacks.close(); + let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, true); + if let Ok(tpl) = consumer.assignment() { + tpl.elements() + .iter() + .for_each(|el| { + + let tp: TopicPartition = (el.topic().into(), el.partition()); + if let Some(end) = end_signals.remove(&tp) { + debug!("Shutting down and revoking partition {}:{}", &tp.0, tp.1); + state.revoke_partition(tp, end); + } else { + debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1); + } + }); + } + // If shutdown was initiated by partition EOF mode, the drain phase + // will already be complete and would time out if not accounted for here + if state.is_drain_complete() { + state.finish_drain(deadline) + } else { + state.keep_draining(deadline) + } + } + }, + }, - parse_message(msg, decoder.clone(), config.keys(), &finalizer, &mut out, &consumer, log_namespace).await; + Some(_) = &mut drain_deadline => (drain_deadline, consumer_state) = match consumer_state { + ConsumerState::Complete(_) => unreachable!("Drain deadline received after completion."), + ConsumerState::Consuming(state) => { + warn!("A drain deadline fired outside of draining mode."); + state.keep_consuming(None.into()) + }, + ConsumerState::Draining(mut draining) => { + debug!("Acknowledgement drain deadline reached. Dropping any pending ack streams for revoked partitions."); + for tp in draining.consumer_state.expect_drain.drain() { + if let Some(handle) = abort_handles.remove(&tp) { + handle.abort(); + } + } + draining.finish_drain(drain_deadline) } }, } } +} - // Since commits are async internally, we try one last sync commit inside the interval - // in case there have been acks. - if let Ok(current_assignment) = consumer.assignment() { - // not logging on error because it will error if there are no offsets stored for a partition, - // and this is best-effort cleanup anyway - _ = consumer.commit(¤t_assignment, CommitMode::Sync); - } - Ok(()) +fn drive_kafka_consumer( + consumer: Arc>, + mut shutdown: ShutdownSignal, + eof: Option>, +) { + Handle::current().block_on(async move { + let mut eof: OptionFuture<_> = eof.into(); + let mut stream = consumer.stream(); + loop { + tokio::select! { + _ = &mut shutdown => { + consumer.context().shutdown(); + break + }, + + Some(_) = &mut eof => { + consumer.context().shutdown(); + break + }, + + // NB: messages are not received on this thread, however we poll + // the consumer to serve client callbacks, such as rebalance notifications + message = stream.next() => match message { + None => unreachable!("MessageStream never returns Ready(None)"), + Some(Err(error)) => emit!(KafkaReadError { error }), + Some(Ok(_msg)) => { + unreachable!("Messages are consumed in dedicated tasks for each partition.") + } + }, + } + } + }); } async fn parse_message( msg: BorrowedMessage<'_>, decoder: Decoder, - keys: Keys<'_>, - finalizer: &Option>>, + keys: &'_ Keys, out: &mut SourceSender, - consumer: &Arc>, + acknowledgements: bool, + finalizer: &Option>, log_namespace: LogNamespace, ) { - if let Some((count, mut stream)) = parse_stream(&msg, decoder, keys, log_namespace) { - match finalizer { - Some(finalizer) => { - let (batch, receiver) = BatchNotifier::new_with_receiver(); - let mut stream = stream.map(|event| event.with_batch_notifier(&batch)); - match out.send_event_stream(&mut stream).await { - Err(_) => { - emit!(StreamClosedError { count }); - } - Ok(_) => { - // Drop stream to avoid borrowing `msg`: "[...] borrow might be used - // here, when `stream` is dropped and runs the destructor [...]". - drop(stream); - finalizer.add(msg.into(), receiver); - } - } + if let Some((count, stream)) = parse_stream(&msg, decoder, keys, log_namespace) { + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let mut stream = stream.map(|event| { + // All acknowledgements flow through the normal Finalizer stream so + // that they can be handled in one place, but are only tied to the + // batch when acknowledgements are enabled + if acknowledgements { + event.with_batch_notifier(&batch) + } else { + event } - None => match out.send_event_stream(&mut stream).await { - Err(_) => { - emit!(StreamClosedError { count }); - } - Ok(_) => { - if let Err(error) = - consumer.store_offset(msg.topic(), msg.partition(), msg.offset()) - { - emit!(KafkaOffsetUpdateError { error }); - } + }); + match out.send_event_stream(&mut stream).await { + Err(_) => { + emit!(StreamClosedError { count }); + } + Ok(_) => { + // Drop stream to avoid borrowing `msg`: "[...] borrow might be used + // here, when `stream` is dropped and runs the destructor [...]". + drop(stream); + if let Some(f) = finalizer.as_ref() { + f.add(msg.into(), receiver) } - }, + } } } } @@ -477,7 +975,7 @@ async fn parse_message( fn parse_stream<'a>( msg: &BorrowedMessage<'a>, decoder: Decoder, - keys: Keys<'a>, + keys: &'a Keys, log_namespace: LogNamespace, ) -> Option<(usize, impl Stream + 'a)> { let payload = msg.payload()?; // skip messages with empty payload @@ -499,7 +997,7 @@ fn parse_stream<'a>( partition: rmsg.partition, }); for mut event in events { - rmsg.apply(&keys, &mut event, log_namespace); + rmsg.apply(keys, &mut event, log_namespace); yield event; } }, @@ -518,24 +1016,24 @@ fn parse_stream<'a>( } #[derive(Clone, Debug)] -struct Keys<'a> { +struct Keys { timestamp: Option, - key_field: &'a Option, - topic: &'a Option, - partition: &'a Option, - offset: &'a Option, - headers: &'a Option, + key_field: Option, + topic: Option, + partition: Option, + offset: Option, + headers: Option, } -impl<'a> Keys<'a> { - fn from(schema: &'a LogSchema, config: &'a KafkaSourceConfig) -> Self { +impl Keys { + fn from(schema: &LogSchema, config: &KafkaSourceConfig) -> Self { Self { timestamp: schema.timestamp_key().cloned(), - key_field: &config.key_field.path, - topic: &config.topic_key.path, - partition: &config.partition_key.path, - offset: &config.offset_key.path, - headers: &config.headers_key.path, + key_field: config.key_field.path.clone(), + topic: config.topic_key.path.clone(), + partition: config.partition_key.path.clone(), + offset: config.offset_key.path.clone(), + headers: config.headers_key.path.clone(), } } } @@ -584,7 +1082,7 @@ impl ReceivedMessage { } } - fn apply(&self, keys: &Keys<'_>, event: &mut Event, log_namespace: LogNamespace) { + fn apply(&self, keys: &Keys, event: &mut Event, log_namespace: LogNamespace) { if let Event::Log(ref mut log) = event { match log_namespace { LogNamespace::Vector => { @@ -656,7 +1154,7 @@ impl ReceivedMessage { } } -#[derive(Debug)] +#[derive(Debug, Eq, PartialEq, Hash)] struct FinalizerEntry { topic: String, partition: i32, @@ -673,7 +1171,13 @@ impl<'a> From> for FinalizerEntry { } } -fn create_consumer(config: &KafkaSourceConfig) -> crate::Result> { +fn create_consumer( + config: &KafkaSourceConfig, + acknowledgements: bool, +) -> crate::Result<( + StreamConsumer, + UnboundedReceiver, +)> { let mut client_config = ClientConfig::new(); client_config .set("group.id", &config.group_id) @@ -709,43 +1213,149 @@ fn create_consumer(config: &KafkaSourceConfig) -> crate::Result>(CustomContext::new( + .create_with_context::<_, StreamConsumer<_>>(KafkaSourceContext::new( config.metrics.topic_lag_metric, + acknowledgements, + callbacks, )) - .context(KafkaCreateSnafu)?; + .context(CreateSnafu)?; let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect(); - consumer.subscribe(&topics).context(KafkaSubscribeSnafu)?; + consumer.subscribe(&topics).context(SubscribeSnafu)?; - Ok(consumer) + Ok((consumer, callback_rx)) } -#[derive(Default)] -struct CustomContext { +type TopicPartition = (String, i32); + +/// Status returned by partition consumer tasks, allowing the coordination task +/// to differentiate between a consumer exiting normally (after receiving an end +/// signal) and exiting when it reaches the end of a partition +#[derive(PartialEq)] +enum PartitionConsumerStatus { + NormalExit, + PartitionEOF, +} + +enum KafkaCallback { + PartitionsAssigned(Vec, SyncSender<()>), + PartitionsRevoked(Vec, SyncSender<()>), + ShuttingDown(SyncSender<()>), +} + +struct KafkaSourceContext { + acknowledgements: bool, stats: kafka::KafkaStatisticsContext, - finalizer: OnceLock>>, + + /// A callback channel used to coordinate between the main consumer task and the acknowledgement task + callbacks: UnboundedSender, + + /// A weak reference to the consumer, so that we can commit offsets during a rebalance operation + consumer: OnceLock>>, } -impl CustomContext { - fn new(expose_lag_metrics: bool) -> Self { +impl KafkaSourceContext { + fn new( + expose_lag_metrics: bool, + acknowledgements: bool, + callbacks: UnboundedSender, + ) -> Self { Self { stats: kafka::KafkaStatisticsContext { expose_lag_metrics }, - ..Default::default() + acknowledgements, + consumer: OnceLock::default(), + callbacks, + } + } + + fn shutdown(&self) { + let (send, rendezvous) = sync_channel(0); + if self + .callbacks + .send(KafkaCallback::ShuttingDown(send)) + .is_ok() + { + while rendezvous.recv().is_ok() { + self.commit_consumer_state(); + } + } + } + + /// Emit a PartitionsAssigned callback with the topic-partitions to be consumed, + /// and block until confirmation is received that a stream and consumer for + /// each topic-partition has been set up. This function blocks until the + /// rendezvous channel sender is dropped by the callback handler. + fn consume_partitions(&self, tpl: &TopicPartitionList) { + let (send, rendezvous) = sync_channel(0); + let _ = self.callbacks.send(KafkaCallback::PartitionsAssigned( + tpl.elements() + .iter() + .map(|tp| (tp.topic().into(), tp.partition())) + .collect(), + send, + )); + + while rendezvous.recv().is_ok() { + // no-op: wait for partition assignment handler to complete + } + } + + /// Emit a PartitionsRevoked callback and block until confirmation is + /// received that acknowledgements have been processed for each of them. + /// The rendezvous channel used in the callback can send multiple times to + /// signal individual partitions completing. This function blocks until the + /// sender is dropped by the callback handler. + fn revoke_partitions(&self, tpl: &TopicPartitionList) { + let (send, rendezvous) = sync_channel(0); + let _ = self.callbacks.send(KafkaCallback::PartitionsRevoked( + tpl.elements() + .iter() + .map(|tp| (tp.topic().into(), tp.partition())) + .collect(), + send, + )); + + while rendezvous.recv().is_ok() { + self.commit_consumer_state(); + } + } + + fn commit_consumer_state(&self) { + if let Some(consumer) = self + .consumer + .get() + .expect("Consumer reference was not initialized.") + .upgrade() + { + match consumer.commit_consumer_state(CommitMode::Sync) { + Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => { + /* Success, or nothing to do - yay \0/ */ + } + Err(error) => emit!(KafkaOffsetUpdateError { error }), + } } } } -impl ClientContext for CustomContext { +impl ClientContext for KafkaSourceContext { fn stats(&self, statistics: Statistics) { self.stats.stats(statistics) } } -impl ConsumerContext for CustomContext { - fn post_rebalance(&self, rebalance: &Rebalance) { - if matches!(rebalance, Rebalance::Revoke(_)) { - if let Some(finalizer) = self.finalizer.get() { - finalizer.flush(); +impl ConsumerContext for KafkaSourceContext { + fn pre_rebalance(&self, rebalance: &Rebalance) { + match rebalance { + Rebalance::Assign(tpl) => self.consume_partitions(tpl), + + Rebalance::Revoke(tpl) => { + self.revoke_partitions(tpl); + self.commit_consumer_state(); + } + + Rebalance::Error(message) => { + error!("Error during Kafka consumer group rebalance: {}.", message); } } } @@ -761,9 +1371,13 @@ mod test { pub fn kafka_host() -> String { std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into()) } + pub fn kafka_port() -> u16 { + let port = std::env::var("KAFKA_PORT").unwrap_or_else(|_| "9091".into()); + port.parse().expect("Invalid port number") + } - pub fn kafka_address(port: u16) -> String { - format!("{}:{}", kafka_host(), port) + pub fn kafka_address() -> String { + format!("{}:{}", kafka_host(), kafka_port()) } #[test] @@ -775,14 +1389,16 @@ mod test { topic: &str, group: &str, log_namespace: LogNamespace, + librdkafka_options: Option>, ) -> KafkaSourceConfig { KafkaSourceConfig { - bootstrap_servers: kafka_address(9091), + bootstrap_servers: kafka_address(), topics: vec![topic.into()], group_id: group.into(), auto_offset_reset: "beginning".into(), session_timeout_ms: Duration::from_millis(6000), commit_interval_ms: Duration::from_millis(1), + librdkafka_options, key_field: default_key_field(), topic_key: default_topic_key(), partition_key: default_partition_key(), @@ -797,7 +1413,7 @@ mod test { #[test] fn test_output_schema_definition_vector_namespace() { - let definitions = make_config("topic", "group", LogNamespace::Vector) + let definitions = make_config("topic", "group", LogNamespace::Vector, None) .outputs(LogNamespace::Vector) .remove(0) .schema_definition(true); @@ -845,7 +1461,7 @@ mod test { #[test] fn test_output_schema_definition_legacy_namespace() { - let definitions = make_config("topic", "group", LogNamespace::Legacy) + let definitions = make_config("topic", "group", LogNamespace::Legacy, None) .outputs(LogNamespace::Legacy) .remove(0) .schema_definition(true); @@ -881,17 +1497,17 @@ mod test { #[tokio::test] async fn consumer_create_ok() { - let config = make_config("topic", "group", LogNamespace::Legacy); - assert!(create_consumer(&config).is_ok()); + let config = make_config("topic", "group", LogNamespace::Legacy, None); + assert!(create_consumer(&config, true).is_ok()); } #[tokio::test] async fn consumer_create_incorrect_auto_offset_reset() { let config = KafkaSourceConfig { auto_offset_reset: "incorrect-auto-offset-reset".to_string(), - ..make_config("topic", "group", LogNamespace::Legacy) + ..make_config("topic", "group", LogNamespace::Legacy, None) }; - assert!(create_consumer(&config).is_err()); + assert!(create_consumer(&config, true).is_err()); } } @@ -902,6 +1518,7 @@ mod integration_test { use chrono::{DateTime, SubsecRound, Utc}; use futures::Stream; + use futures_util::stream::FuturesUnordered; use rdkafka::{ admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, client::DefaultClientContext, @@ -931,9 +1548,17 @@ mod integration_test { const HEADER_KEY: &str = "my header"; const HEADER_VALUE: &str = "my header value"; + fn kafka_test_topic() -> String { + std::env::var("KAFKA_TEST_TOPIC") + .unwrap_or_else(|_| format!("test-topic-{}", random_string(10))) + } + fn kafka_max_bytes() -> String { + std::env::var("KAFKA_MAX_BYTES").unwrap_or_else(|_| "1024".into()) + } + fn client_config(group: Option<&str>) -> T { let mut client = ClientConfig::new(); - client.set("bootstrap.servers", kafka_address(9091)); + client.set("bootstrap.servers", kafka_address()); client.set("produce.offset.report", "true"); client.set("message.timeout.ms", "5000"); client.set("auto.commit.interval.ms", "1"); @@ -943,32 +1568,47 @@ mod integration_test { client.create().expect("Producer creation error") } - async fn send_events(topic: String, count: usize) -> DateTime { + async fn send_events(topic: String, partitions: i32, count: usize) -> DateTime { let now = Utc::now(); let timestamp = now.timestamp_millis(); - let producer: FutureProducer = client_config(None); - - for i in 0..count { - let text = format!("{} {:03}", TEXT, i); - let key = format!("{} {}", KEY, i); - let record = FutureRecord::to(&topic) - .payload(&text) - .key(&key) - .timestamp(timestamp) - .headers(OwnedHeaders::new().insert(Header { - key: HEADER_KEY, - value: Some(HEADER_VALUE), - })); - - if let Err(error) = producer.send(record, Timeout::Never).await { - panic!("Cannot send event to Kafka: {:?}", error); - } - } + let producer: &FutureProducer = &client_config(None); + let topic_name = topic.as_ref(); + + create_topic(topic_name, partitions).await; + + (0..count) + .map(|i| async move { + let text = format!("{} {:03}", TEXT, i); + let key = format!("{} {}", KEY, i); + let record = FutureRecord::to(topic_name) + .payload(&text) + .key(&key) + .timestamp(timestamp) + .headers(OwnedHeaders::new().insert(Header { + key: HEADER_KEY, + value: Some(HEADER_VALUE), + })); + if let Err(error) = producer.send(record, Timeout::Never).await { + panic!("Cannot send event to Kafka: {:?}", error); + } + }) + .collect::>() + .collect::>() + .await; now } + async fn send_to_test_topic(partitions: i32, count: usize) -> (String, String, DateTime) { + let topic = kafka_test_topic(); + let group_id = format!("test-group-{}", random_string(10)); + + let sent_at = send_events(topic.clone(), partitions, count).await; + + (topic, group_id, sent_at) + } + #[tokio::test] async fn consumes_event_with_acknowledgements() { send_receive(true, |_| false, 10, LogNamespace::Legacy).await; @@ -1019,14 +1659,14 @@ mod integration_test { let topic = format!("test-topic-{}", random_string(10)); let group_id = format!("test-group-{}", random_string(10)); - let config = make_config(&topic, &group_id, log_namespace); + let config = make_config(&topic, &group_id, log_namespace, None); - let now = send_events(topic.clone(), 10).await; + let now = send_events(topic.clone(), 1, 10).await; let events = assert_source_compliance(&["protocol", "topic", "partition"], async move { let (tx, rx) = SourceSender::new_test_errors(error_at); let (trigger_shutdown, shutdown_done) = - spawn_kafka(tx, config, acknowledgements, log_namespace); + spawn_kafka(tx, config, acknowledgements, false, log_namespace); let events = collect_n(rx, SEND_COUNT).await; // Yield to the finalization task to let it collect the // batch status receivers before signalling the shutdown. @@ -1111,7 +1751,7 @@ mod integration_test { fn make_rand_config() -> (String, String, KafkaSourceConfig) { let topic = format!("test-topic-{}", random_string(10)); let group_id = format!("test-group-{}", random_string(10)); - let config = make_config(&topic, &group_id, LogNamespace::Legacy); + let config = make_config(&topic, &group_id, LogNamespace::Legacy, None); (topic, group_id, config) } @@ -1138,13 +1778,13 @@ mod integration_test { } fn spawn_kafka( - tx: SourceSender, + out: SourceSender, config: KafkaSourceConfig, acknowledgements: bool, + eof: bool, log_namespace: LogNamespace, ) -> (Trigger, Tripwire) { let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired(); - let consumer = create_consumer(&config).unwrap(); let decoder = DecodingConfig::new( config.framing.clone(), @@ -1154,13 +1794,16 @@ mod integration_test { .build() .unwrap(); + let (consumer, callback_rx) = create_consumer(&config, acknowledgements).unwrap(); + tokio::spawn(kafka_source( config, consumer, + callback_rx, decoder, + out, shutdown, - tx, - acknowledgements, + eof, log_namespace, )); (trigger_shutdown, shutdown_done) @@ -1180,9 +1823,9 @@ mod integration_test { .offset() } - async fn create_topic(group_id: &str, topic: &str, partitions: i32) { - let client: AdminClient = client_config(Some(group_id)); - for result in client + async fn create_topic(topic: &str, partitions: i32) { + let client: AdminClient = client_config(None); + let topic_results = client .create_topics( [&NewTopic { name: topic, @@ -1193,9 +1836,14 @@ mod integration_test { &AdminOptions::default(), ) .await - .expect("create_topics failed") - { - result.expect("Creating a topic failed"); + .expect("create_topics failed"); + + for result in topic_results { + if let Err((topic, err)) = result { + if err != rdkafka::types::RDKafkaErrorCode::TopicAlreadyExists { + panic!("Creating a topic failed: {:?}", (topic, err)) + } + } } } @@ -1228,20 +1876,20 @@ mod integration_test { const DELAY: u64 = 100; let (topic, group_id, config) = make_rand_config(); - create_topic(&group_id, &topic, 2).await; + create_topic(&topic, 2).await; - let _send_start = send_events(topic.clone(), NEVENTS).await; + let _send_start = send_events(topic.clone(), 1, NEVENTS).await; let (tx, rx1) = delay_pipeline(1, Duration::from_millis(200), EventStatus::Delivered); let (trigger_shutdown1, shutdown_done1) = - spawn_kafka(tx, config.clone(), true, LogNamespace::Legacy); + spawn_kafka(tx, config.clone(), true, false, LogNamespace::Legacy); let events1 = tokio::spawn(collect_n(rx1, NEVENTS)); sleep(Duration::from_secs(1)).await; let (tx, rx2) = delay_pipeline(2, Duration::from_millis(DELAY), EventStatus::Delivered); let (trigger_shutdown2, shutdown_done2) = - spawn_kafka(tx, config, true, LogNamespace::Legacy); + spawn_kafka(tx, config, true, false, LogNamespace::Legacy); let events2 = tokio::spawn(collect_n(rx2, NEVENTS)); sleep(Duration::from_secs(5)).await; @@ -1285,6 +1933,203 @@ mod integration_test { // Assert they are all in sequential order and no dupes, TODO } + #[tokio::test] + async fn drains_acknowledgements_at_shutdown() { + // 1. Send N events (if running against a pre-populated kafka topic, use send_count=0 and expect_count=expected number of messages; otherwise just set send_count) + let send_count: usize = std::env::var("KAFKA_SEND_COUNT") + .unwrap_or_else(|_| "125000".into()) + .parse() + .expect("Number of messages to send to kafka."); + let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT") + .unwrap_or_else(|_| format!("{}", send_count)) + .parse() + .expect("Number of messages to expect consumers to process."); + let delay_ms: u64 = std::env::var("KAFKA_SHUTDOWN_DELAY") + .unwrap_or_else(|_| "2000".into()) + .parse() + .expect("Number of milliseconds before shutting down first consumer."); + + let (topic, group_id, _) = send_to_test_topic(1, send_count).await; + + // 2. Run the kafka source to read some of the events + // 3. Send a shutdown signal (at some point before all events are read) + let mut opts = HashMap::new(); + // Set options to get partition EOF notifications, and fetch data in small/configurable size chunks + opts.insert("enable.partition.eof".into(), "true".into()); + opts.insert("fetch.message.max.bytes".into(), kafka_max_bytes()); + let events1 = { + let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts.clone())); + let (tx, rx) = SourceSender::new_test_errors(|_| false); + let (trigger_shutdown, shutdown_done) = + spawn_kafka(tx, config, true, false, LogNamespace::Legacy); + let (events, _) = tokio::join!(rx.collect::>(), async move { + sleep(Duration::from_millis(delay_ms)).await; + drop(trigger_shutdown); + }); + shutdown_done.await; + events + }; + + debug!("Consumer group.id: {}", &group_id); + debug!( + "First consumer read {} of {} messages.", + events1.len(), + expect_count + ); + + // 4. Run the kafka source again to finish reading the events + let events2 = { + let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts)); + let (tx, rx) = SourceSender::new_test_errors(|_| false); + let (trigger_shutdown, shutdown_done) = + spawn_kafka(tx, config, true, true, LogNamespace::Legacy); + let events = rx.collect::>().await; + drop(trigger_shutdown); + shutdown_done.await; + events + }; + + debug!( + "Second consumer read {} of {} messages.", + events2.len(), + expect_count + ); + + // 5. Total number of events processed should equal the number sent + let total = events1.len() + events2.len(); + assert_ne!( + events1.len(), + 0, + "First batch of events should be non-zero (increase KAFKA_SHUTDOWN_DELAY?)" + ); + assert_ne!(events2.len(), 0, "Second batch of events should be non-zero (decrease KAFKA_SHUTDOWN_DELAY or increase KAFKA_SEND_COUNT?) "); + assert_eq!(total, expect_count); + } + + async fn consume_with_rebalance(rebalance_strategy: String) { + // 1. Send N events (if running against a pre-populated kafka topic, use send_count=0 and expect_count=expected number of messages; otherwise just set send_count) + let send_count: usize = std::env::var("KAFKA_SEND_COUNT") + .unwrap_or_else(|_| "125000".into()) + .parse() + .expect("Number of messages to send to kafka."); + let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT") + .unwrap_or_else(|_| format!("{}", send_count)) + .parse() + .expect("Number of messages to expect consumers to process."); + let delay_ms: u64 = std::env::var("KAFKA_CONSUMER_DELAY") + .unwrap_or_else(|_| "2000".into()) + .parse() + .expect("Number of milliseconds before shutting down first consumer."); + + let (topic, group_id, _) = send_to_test_topic(6, send_count).await; + debug!("Topic: {}", &topic); + debug!("Consumer group.id: {}", &group_id); + + // 2. Run the kafka source to read some of the events + // 3. Start 2nd & 3rd consumers using the same group.id, triggering rebalance events + let mut kafka_options = HashMap::new(); + kafka_options.insert("enable.partition.eof".into(), "true".into()); + kafka_options.insert("fetch.message.max.bytes".into(), kafka_max_bytes()); + kafka_options.insert("partition.assignment.strategy".into(), rebalance_strategy); + let config1 = make_config( + &topic, + &group_id, + LogNamespace::Legacy, + Some(kafka_options.clone()), + ); + let config2 = config1.clone(); + let config3 = config1.clone(); + let config4 = config1.clone(); + + let (events1, events2, events3) = tokio::join!( + async move { + let (tx, rx) = SourceSender::new_test_errors(|_| false); + let (_trigger_shutdown, _shutdown_done) = + spawn_kafka(tx, config1, true, true, LogNamespace::Legacy); + + rx.collect::>().await + }, + async move { + sleep(Duration::from_millis(delay_ms)).await; + let (tx, rx) = SourceSender::new_test_errors(|_| false); + let (_trigger_shutdown, _shutdown_done) = + spawn_kafka(tx, config2, true, true, LogNamespace::Legacy); + + rx.collect::>().await + }, + async move { + sleep(Duration::from_millis(delay_ms * 2)).await; + let (tx, rx) = SourceSender::new_test_errors(|_| false); + let (_trigger_shutdown, _shutdown_done) = + spawn_kafka(tx, config3, true, true, LogNamespace::Legacy); + + rx.collect::>().await + } + ); + + let unconsumed = async move { + let (tx, rx) = SourceSender::new_test_errors(|_| false); + let (_trigger_shutdown, _shutdown_done) = + spawn_kafka(tx, config4, true, true, LogNamespace::Legacy); + + rx.collect::>().await + } + .await; + + debug!( + "First consumer read {} of {} messages.", + events1.len(), + expect_count + ); + + debug!( + "Second consumer read {} of {} messages.", + events2.len(), + expect_count + ); + debug!( + "Third consumer read {} of {} messages.", + events3.len(), + expect_count + ); + + // 5. Total number of events processed should equal the number sent + let total = events1.len() + events2.len() + events3.len(); + assert_ne!( + events1.len(), + 0, + "First batch of events should be non-zero (increase delay?)" + ); + assert_ne!( + events2.len(), + 0, + "Second batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) " + ); + assert_ne!( + events3.len(), + 0, + "Third batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) " + ); + assert_eq!( + unconsumed.len(), + 0, + "The first set of consumers should consume and ack all messages." + ); + assert_eq!(total, expect_count); + } + + #[tokio::test] + async fn drains_acknowledgements_during_rebalance_default_assignments() { + // the default, eager rebalance strategies generally result in more revocations + consume_with_rebalance("range,roundrobin".into()).await; + } + #[tokio::test] + async fn drains_acknowledgements_during_rebalance_sticky_assignments() { + // Cooperative rebalance strategies generally result in fewer revokes, + // as only reassigned partitions are revoked + consume_with_rebalance("cooperative-sticky".into()).await; + } + fn map_logs(events: EventArray) -> impl Iterator { events.into_events().map(|event| { let log = event.into_log(); diff --git a/website/cue/reference/components/sources/base/kafka.cue b/website/cue/reference/components/sources/base/kafka.cue index 35e538f2764a8f..2f84df38358934 100644 --- a/website/cue/reference/components/sources/base/kafka.cue +++ b/website/cue/reference/components/sources/base/kafka.cue @@ -192,6 +192,21 @@ base: components: sources: kafka: configuration: { } } } + drain_timeout_ms: { + description: """ + Timeout to drain pending acknowledgements during shutdown or a Kafka + consumer group rebalance. + + When Vector shuts down or the Kafka consumer group revokes partitions from this + consumer, wait a maximum of `drain_timeout_ms` for the source to + process pending acknowledgements. Must be less than `session_timeout_ms` + to ensure the consumer is not excluded from the group during a rebalance. + + Default value is half of `session_timeout_ms`. + """ + required: false + type: uint: examples: [2500, 5000] + } fetch_wait_max_ms: { description: "Maximum time the broker may wait to fill the response." required: false