Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafka source): fix acknowledgement handling during shutdown and rebalance events #17497

Merged
Merged
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
488aa63
test(kafka source): integration tests for acknowledgement handling du…
jches Apr 27, 2023
2b81df3
fix(kafka source): drain pending acknowledgements on shutdown and reb…
jches May 16, 2023
03aedbd
fix(kafka source): performance improvements for acknowledgement handl…
jches May 25, 2023
0191497
clippy fixes, and remove unnecessary last_offset tracking
jches May 25, 2023
bf4a5c3
cargo fmt again
jches May 26, 2023
9d32a82
fmt
jches May 26, 2023
3582940
clean up handle_messages loop and add a tracing span for metrics coll…
jches Jun 29, 2023
1707266
Merge branch 'master' into kafka-shutdown-rebalance-pending-acks
jches Jun 29, 2023
c21e174
fixup changes lost after merging master
jches Jun 29, 2023
28b2b44
clippy warning
jches Jul 3, 2023
ab243cb
enhancement(kafka source): kafka source uses a dedicated task per par…
jches Aug 28, 2023
fe4ae3e
Merge branch 'master' into kafka-shutdown-rebalance-pending-acks
jches Aug 28, 2023
485524f
make the spelling checker happy, maybe?
jches Aug 28, 2023
dc723a7
emit a debug log instead of panicking if a shutdown happens during a …
jches Aug 29, 2023
1f49643
improved partition eof handling
jches Aug 29, 2023
a69b2c4
add OptionFuture to drain deadline and EOF handling, and use is_subse…
jches Sep 6, 2023
0b6d17e
replace OnceCell with OnceLock
jches Sep 7, 2023
34f2f85
Merge branch 'master' into kafka-shutdown-rebalance-pending-acks
jches Sep 7, 2023
6865470
cargo fmt
jches Sep 7, 2023
84dfe92
create clear distinction between consuming and draining states
jches Sep 7, 2023
d823820
add "complete" as a terminal state, and "keep_consuming", "keep_drain…
jches Sep 8, 2023
c6082f3
use state transition methods consistently for all state transitions
jches Sep 8, 2023
444b3e1
slightly clearer assertion messages about what is expected
jches Sep 8, 2023
88cd2e5
update obsolete comment, make coordinator loop condition explicit
jches Sep 8, 2023
db999e2
use keep_consuming from the drain_timeout while consuming handler
jches Sep 8, 2023
1edb270
rely solely on adding/removing entries in expect_drain to detect when…
jches Sep 11, 2023
d2cdeb1
fix comment :P
jches Sep 12, 2023
e4e57b9
clippy/fmt fixes
jches Sep 12, 2023
7999fa7
minor cleanup: during shutdown, use is_drain_complete to detect the a…
jches Sep 13, 2023
78c7a0e
integration test uses `FuturesUnordered` for better performance
jches Sep 20, 2023
c88e688
use FuturesUnordered
jches Sep 20, 2023
b3cef9b
use 6 partitions for integration test
jches Sep 20, 2023
61784a3
integration test using 125k messages
jches Sep 21, 2023
89dbaa9
add drain_timeout_ms option for kafka source
jches Sep 22, 2023
bb4b544
enforce drain_timeout_ms < session_timeout_ms when building kafka source
jches Sep 22, 2023
afb7fa1
generate component docs
jches Sep 22, 2023
e455ce0
use Option::{unzip, map_or} methods
jches Oct 3, 2023
4ad434b
remove OnceLock on callback channel sender, and other review cleanups
jches Oct 4, 2023
6353bbc
members of Keys struct are cloned once per consumed partition, instea…
jches Oct 4, 2023
ff086d7
cargo fmt and fix clippy warnings
jches Oct 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rely solely on adding/removing entries in expect_drain to detect when…
… draining is complete
jches committed Sep 11, 2023
commit 1edb2703f6a1804ba5d8b84f26b4279abc5b6b2c
36 changes: 19 additions & 17 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
@@ -483,14 +483,12 @@ struct Complete;
struct Draining {
signal: SyncSender<()>,

/// The Set of partitions expected to drain during a shutdown or rebalance that revokes partitions
/// The set of topic-partition tasks that have completed, populated at the
/// beginning of a rebalance or shutdown. Partitions that are not being
/// actively consumed (e.g. due to the consumer task exiting early) should not
/// be added. The draining phase is considered complete when this set is empty.
expect_drain: HashSet<TopicPartition>,

/// The set of partitions we have observed and stored the final acknowledgement for. Ack streams
/// can complete before we get a rebalance callback, so "observed complete" (based on seeing the end of the stream)
/// and "expect to complete" (based on seeing a rebalance callback with revoked partition info) are tracked separately.
observed_drain: HashSet<TopicPartition>,

/// Whether the client is shutting down after draining
shutdown: bool,
}
@@ -506,12 +504,11 @@ impl Draining {
signal,
shutdown,
expect_drain: HashSet::new(),
observed_drain: HashSet::new(),
}
}

fn is_complete(&self) -> bool {
self.expect_drain.is_subset(&self.observed_drain)
self.expect_drain.is_empty()
}
}

@@ -655,7 +652,11 @@ impl ConsumerStateInner<Consuming> {
impl ConsumerStateInner<Draining> {
/// Mark the given TopicPartition as being revoked, adding it to the set of
/// partitions expected to drain
pub fn revoke_partition(&mut self, tp: TopicPartition) {
pub fn revoke_partition(&mut self, tp: TopicPartition, end_signal: oneshot::Sender<()>) {
// Note that if this send() returns Err, it means the task has already
// ended, but the completion has not been processed yet (otherwise we wouldn't have access to the end_signal),
// so we should still add it to the "expect to drain" set
_ = end_signal.send(());
self.consumer_state.expect_drain.insert(tp);
}

@@ -664,11 +665,10 @@ impl ConsumerStateInner<Draining> {
/// sent on the signal channel, indicating to the client that offsets may be committed
pub fn partition_drained(&mut self, tp: TopicPartition) {
_ = self.consumer_state.signal.send(());
bruceg marked this conversation as resolved.
Show resolved Hide resolved
self.consumer_state.observed_drain.insert(tp);
self.consumer_state.expect_drain.remove(&tp);
}

/// Return true if the set of expected drained partitions is a subset of the
/// partitions that have been observed to be finished
/// Return true if all expected partitions have drained
pub fn is_drain_complete(&self) -> bool {
self.consumer_state.is_complete()
}
@@ -801,10 +801,11 @@ async fn coordinate_kafka_callbacks(

for tp in revoked_partitions.drain(0..) {
if let Some(end) = end_signals.remove(&tp) {
let _ = end.send(());
debug!("Revoking partition {}:{}", &tp.0, tp.1);
state.revoke_partition(tp, end);
} else {
debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
}
debug!("Revoking partition {}:{}", &tp.0, tp.1);
state.revoke_partition(tp);
}

state.keep_draining(deadline)
@@ -837,9 +838,10 @@ async fn coordinate_kafka_callbacks(

let tp: TopicPartition = (el.topic().into(), el.partition());
if let Some(end) = end_signals.remove(&tp) {
let _ = end.send(());
state.revoke_partition(tp, end);
} else {
debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
}
state.revoke_partition(tp);
});
}
state.keep_draining(deadline)