From 84badddfbe523efab342e383c16d6cb38845507b Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 16 Apr 2021 14:57:01 +0200 Subject: [PATCH] Fix #813 Signed-off-by: Francesco Guardiani --- .../consumer/impl/UnorderedOffsetManager.java | 2 +- .../consumer/impl/UnorderedOffsetManagerTest.java | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/consumer/impl/UnorderedOffsetManager.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/consumer/impl/UnorderedOffsetManager.java index 14022ab127..1d194ee7c2 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/consumer/impl/UnorderedOffsetManager.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/consumer/impl/UnorderedOffsetManager.java @@ -251,7 +251,7 @@ private int blockIndex(long val) { } private void checkAcksArraySize(int blockIndex) { - if (this.uncommitted.length < blockIndex) { + if (blockIndex > this.uncommitted.length - 1) { // Let's make sure we create enough room for more unordered records this.uncommitted = Arrays.copyOf(this.uncommitted, (blockIndex + 1) * 2); } diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/consumer/impl/UnorderedOffsetManagerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/consumer/impl/UnorderedOffsetManagerTest.java index 14c2ae584a..c5fb5ca28f 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/consumer/impl/UnorderedOffsetManagerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/consumer/impl/UnorderedOffsetManagerTest.java @@ -50,6 +50,16 @@ public void shouldCommitAfterSendingEventsOrderedOnTheSamePartition() { .containsEntry(new TopicPartition("aaa", 0), 10L); } + @Test + public void shouldNotCommitAndNotGoOutOfBounds() { + assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { + offsetStrategy.recordReceived(record("aaa", 0, 0)); + offsetStrategy.successfullySentToSubscriber(record("aaa", 0, 64)); + offsetStrategy.successfullySentToSubscriber(record("aaa", 0, 128)); + }) + .isEmpty(); + } + @Test public void shouldCommitAfterSendingEventsOrderedOnTheSamePartitionWithInducedFailure() { assertThatOffsetCommittedWithFailures(List.of(new TopicPartition("aaa", 0)), (offsetStrategy, failureFlag) -> {