Skip to content

Commit 8f7f55b

Browse files
rhryngaryrussell
authored andcommitted
Log Uncommitted After Rebalance
When commits a retryable, some partitions may have been assigned to another instance, in which case, those offsets can't be committed. Log the offsets that could not be committed at WARN level. Changed log level to improve troubleshooting Changed log level to improve troubleshooting Changed log level to improve troubleshooting Fixed formatting
1 parent daf7233 commit 8f7f55b

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1565,7 +1565,15 @@ private void checkRebalanceCommits() {
15651565
Map<TopicPartition, OffsetAndMetadata> commits = this.commitsDuringRebalance.entrySet()
15661566
.stream()
15671567
.filter(entry -> this.assignedPartitions.contains(entry.getKey()))
1568-
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
1568+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
1569+
1570+
Map<TopicPartition, OffsetAndMetadata> uncommitted = this.commitsDuringRebalance.entrySet()
1571+
.stream()
1572+
.filter(entry -> !this.assignedPartitions.contains(entry.getKey()))
1573+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
1574+
this.logger.warn(() -> "These offsets could not be committed; partition(s) lost during rebalance: "
1575+
+ uncommitted);
1576+
15691577
this.commitsDuringRebalance.clear();
15701578
this.logger.debug(() -> "Commit list: " + commits);
15711579
commitSync(commits);

0 commit comments

Comments
 (0)