Skip to content

Commit

Permalink
spring-projectsGH-2731: fix count and time ack mode
Browse files Browse the repository at this point in the history
count ack mode commit offset when next poll return no records
  • Loading branch information
Zhiyang.Wang1 committed Oct 26, 2023
1 parent 1817703 commit e93e539
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,10 @@ protected void pollAndInvoke() {
this.firstPoll = true;
this.consumerSeekAwareListener.onFirstPoll();
}
if (records != null && records.count() == 0 && this.isCountAck && this.count > 0) {
commitIfNecessary();
this.count = 0;
}
debugRecords(records);

invokeIfHaveRecords(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3060,12 +3060,12 @@ public void testAckModeCount() throws Exception {
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records2 = new HashMap<>();
records2.put(topicPartition, Arrays.asList(
new ConsumerRecord<>("foo", 0, 2L, 1, "baz"),
new ConsumerRecord<>("foo", 0, 3L, 1, "qux"))); // commit (4 >= 3)
new ConsumerRecord<>("foo", 0, 3L, 1, "qux"))); // commit (3 = 3)
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records3 = new HashMap<>();
records3.put(topicPartition, Arrays.asList(
new ConsumerRecord<>("foo", 0, 4L, 1, "fiz"),
new ConsumerRecord<>("foo", 0, 5L, 1, "buz"),
new ConsumerRecord<>("foo", 0, 6L, 1, "bif"))); // commit (3 >= 3)
new ConsumerRecord<>("foo", 0, 5L, 1, "buz"), // commit (3 = 3)
new ConsumerRecord<>("foo", 0, 6L, 1, "bif"))); // commit (1 when next poll returns no records)
ConsumerRecords<Integer, String> consumerRecords1 = new ConsumerRecords<>(records1);
ConsumerRecords<Integer, String> consumerRecords2 = new ConsumerRecords<>(records2);
ConsumerRecords<Integer, String> consumerRecords3 = new ConsumerRecords<>(records3);
Expand All @@ -3085,7 +3085,7 @@ public void testAckModeCount() throws Exception {
return emptyRecords;
}
});
final CountDownLatch commitLatch = new CountDownLatch(2);
final CountDownLatch commitLatch = new CountDownLatch(3);
willAnswer(i -> {
commitLatch.countDown();
return null;
Expand Down Expand Up @@ -3116,6 +3116,8 @@ public void testAckModeCount() throws Exception {
Duration.ofSeconds(42));
verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(6L)),
Duration.ofSeconds(42));
verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(7L)),
Duration.ofSeconds(42));
container.stop();
}

Expand Down

0 comments on commit e93e539

Please sign in to comment.