From 3458ef41418bcd56160037c264bfa0591b7fd635 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 10 Nov 2021 10:33:46 +0100 Subject: [PATCH] [release-0.26] Commit offsets at specified intervals (#1405) (#1450) * Commit offsets at specified intervals (#1405) * Commit offset at a specified interval Signed-off-by: Pierangelo Di Pilato * Add reset logic to handle long large offsets Signed-off-by: Pierangelo Di Pilato * Use final Co-authored-by: Matthias Wessendorf Co-authored-by: Matthias Wessendorf * Update codegen Signed-off-by: Pierangelo Di Pilato * ko upgrade fixes Signed-off-by: Pierangelo Di Pilato Co-authored-by: Matthias Wessendorf --- .../v1alpha1/zz_generated.deepcopy.go | 1 + .../UnorderedOffsetManagerBenchmark.java | 74 ++--- .../dispatcher/RecordDispatcherListener.java | 10 +- .../dispatcher/impl/RecordDispatcherImpl.java | 35 ++- .../impl/consumer/OffsetManager.java | 211 ++++++++++++++ .../impl/consumer/OrderedOffsetManager.java | 127 --------- .../impl/consumer/UnorderedOffsetManager.java | 262 ------------------ .../main/ConsumerVerticleFactoryImpl.java | 28 +- .../dispatcher/impl/RecordDispatcherTest.java | 13 +- .../consumer/AbstractOffsetManagerTest.java | 19 +- ...anagerTest.java => OffsetManagerTest.java} | 64 ++++- .../consumer/OrderedOffsetManagerTest.java | 71 ----- .../integration/UnorderedConsumerTest.java | 3 + .../kafka/broker/tests/DataPlaneTest.java | 18 +- test/config/chaos/chaosduck.yaml | 2 +- test/e2e-common.sh | 17 +- .../k8s.io/code-generator/generate-groups.sh | 0 .../knative.dev/pkg/hack/generate-knative.sh | 0 18 files changed, 356 insertions(+), 599 deletions(-) create mode 100644 data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java delete mode 100644 data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManager.java delete mode 100644 data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManager.java rename data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/{UnorderedOffsetManagerTest.java => OffsetManagerTest.java} (79%) delete mode 100644 data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManagerTest.java mode change 100644 => 100755 vendor/k8s.io/code-generator/generate-groups.sh mode change 100644 => 100755 vendor/knative.dev/pkg/hack/generate-knative.sh diff --git a/control-plane/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/control-plane/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index cc63f10b1f..ecb59de63e 100644 --- a/control-plane/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/control-plane/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -1,3 +1,4 @@ +//go:build !ignore_autogenerated // +build !ignore_autogenerated /* diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerBenchmark.java index 926d470ba1..e2f8b56a3c 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerBenchmark.java @@ -19,6 +19,7 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.kafka.client.common.PartitionInfo; import io.vertx.kafka.client.common.TopicPartition; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; @@ -27,11 +28,6 @@ import io.vertx.kafka.client.consumer.OffsetAndMetadata; import io.vertx.kafka.client.consumer.OffsetAndTimestamp; import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.openjdk.jmh.annotations.Benchmark; @@ -41,6 +37,12 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.infra.Blackhole; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + public class UnorderedOffsetManagerBenchmark { @State(Scope.Thread) @@ -71,96 +73,70 @@ public void doSetup() { @Benchmark public void benchmarkReverseOrder(RecordsState recordsState, Blackhole blackhole) { - UnorderedOffsetManager offsetManager = new UnorderedOffsetManager(new MockKafkaConsumer(), null); + final OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L); int partitions = 100; for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.recordReceived(recordsState.records[partition][0]) - ); + offsetManager.recordReceived(recordsState.records[partition][0]); } for (int offset = 9_999; offset > 0; offset--) { for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.recordReceived(recordsState.records[partition][offset]) - ); - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]) - ); + offsetManager.recordReceived(recordsState.records[partition][offset]); + offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]); } } for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[partition][0]) - ); + offsetManager.successfullySentToSubscriber(recordsState.records[partition][0]); } } @Benchmark public void benchmarkOrdered(RecordsState recordsState, Blackhole blackhole) { - UnorderedOffsetManager offsetManager = new UnorderedOffsetManager(new MockKafkaConsumer(), null); + OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L); int partitions = 100; for (int offset = 0; offset < 10_000; offset++) { for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.recordReceived(recordsState.records[partition][offset]) - ); - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]) - ); + offsetManager.recordReceived(recordsState.records[partition][offset]); + offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]); } } } @Benchmark public void benchmarkRealisticCase(RecordsState recordsState, Blackhole blackhole) { - UnorderedOffsetManager offsetManager = new UnorderedOffsetManager(new MockKafkaConsumer(), null); + OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L); int partitions = 10; for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.recordReceived(recordsState.records[partition][0]) - ); + offsetManager.recordReceived(recordsState.records[partition][0]); } for (int partition = 0; partition < partitions; partition++) { - for (int offset : new int[] {5, 2, 0, 7, 1, 3, 4, 6}) { - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]) - ); + for (int offset : new int[]{5, 2, 0, 7, 1, 3, 4, 6}) { + offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]); } } } @Benchmark public void benchmarkMixedABit(RecordsState recordsState, Blackhole blackhole) { - UnorderedOffsetManager offsetManager = new UnorderedOffsetManager(new MockKafkaConsumer(), null); + OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L); int partitions = 4; for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.recordReceived(recordsState.records[partition][0]) - ); + offsetManager.recordReceived(recordsState.records[partition][0]); } for (int i = 0; i < 120; i++) { // This will commit in the following order: // 1 0 3 2 5 4 ... - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[2][i % 2 == 0 ? i + 1 : i - 1]) - ); - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[1][i % 2 == 0 ? i + 1 : i - 1]) - ); - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[0][i % 2 == 0 ? i + 1 : i - 1]) - ); - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[3][i % 2 == 0 ? i + 1 : i - 1]) - ); + offsetManager.successfullySentToSubscriber(recordsState.records[2][i % 2 == 0 ? i + 1 : i - 1]); + offsetManager.successfullySentToSubscriber(recordsState.records[1][i % 2 == 0 ? i + 1 : i - 1]); + offsetManager.successfullySentToSubscriber(recordsState.records[0][i % 2 == 0 ? i + 1 : i - 1]); + offsetManager.successfullySentToSubscriber(recordsState.records[3][i % 2 == 0 ? i + 1 : i - 1]); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java index f047e6ec74..ec5d4745bf 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java @@ -28,7 +28,7 @@ public interface RecordDispatcherListener { * * @param record record received. */ - Future recordReceived(KafkaConsumerRecord record); + void recordReceived(KafkaConsumerRecord record); /** * The given record cannot be delivered to dead letter sink. @@ -36,26 +36,26 @@ public interface RecordDispatcherListener { * @param record record undeliverable to dead letter sink. * @param ex exception occurred. */ - Future failedToSendToDeadLetterSink(KafkaConsumerRecord record, Throwable ex); + void failedToSendToDeadLetterSink(KafkaConsumerRecord record, Throwable ex); /** * The given event doesn't pass the filter. * * @param record record discarded. */ - Future recordDiscarded(KafkaConsumerRecord record); + void recordDiscarded(KafkaConsumerRecord record); /** * The given record has been successfully sent to subscriber. * * @param record record sent to subscriber. */ - Future successfullySentToSubscriber(KafkaConsumerRecord record); + void successfullySentToSubscriber(KafkaConsumerRecord record); /** * The given record has been successfully sent to dead letter sink. * * @param record record sent to dead letter sink. */ - Future successfullySentToDeadLetterSink(KafkaConsumerRecord record); + void successfullySentToDeadLetterSink(KafkaConsumerRecord record); } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index 981121db35..acca22162c 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -28,8 +28,10 @@ import io.vertx.core.Vertx; import io.vertx.kafka.client.common.tracing.ConsumerTracer; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; + import java.util.Objects; import java.util.function.Function; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,16 +141,13 @@ private void onRecordReceived(final KafkaConsumerRecord reco } } - recordDispatcherListener.recordReceived(record) - .onSuccess(v -> { - // Execute filtering - if (filter.test(record.value())) { - onFilterMatching(record, finalProm); - } else { - onFilterNotMatching(record, finalProm); - } - }) - .onFailure(finalProm::fail); // This should really never happen + recordDispatcherListener.recordReceived(record); + // Execute filtering + if (filter.test(record.value())) { + onFilterMatching(record, finalProm); + } else { + onFilterNotMatching(record, finalProm); + } } private void onFilterMatching(final KafkaConsumerRecord record, final Promise finalProm) { @@ -161,15 +160,15 @@ private void onFilterMatching(final KafkaConsumerRecord reco private void onFilterNotMatching(final KafkaConsumerRecord record, final Promise finalProm) { logDebug("Record doesn't match filtering", record); - recordDispatcherListener.recordDiscarded(record) - .onComplete(finalProm); + recordDispatcherListener.recordDiscarded(record); + finalProm.complete(); } private void onSubscriberSuccess(final KafkaConsumerRecord record, final Promise finalProm) { logDebug("Successfully sent event to subscriber", record); - recordDispatcherListener.successfullySentToSubscriber(record) - .onComplete(finalProm); + recordDispatcherListener.successfullySentToSubscriber(record); + finalProm.complete(); } private void onSubscriberFailure(final KafkaConsumerRecord record, @@ -182,15 +181,15 @@ private void onSubscriberFailure(final KafkaConsumerRecord r private void onDeadLetterSinkSuccess(final KafkaConsumerRecord record, final Promise finalProm) { logDebug("Successfully sent event to the dead letter sink", record); - recordDispatcherListener.successfullySentToDeadLetterSink(record) - .onComplete(finalProm); + recordDispatcherListener.successfullySentToDeadLetterSink(record); + finalProm.complete(); } private void onDeadLetterSinkFailure(final KafkaConsumerRecord record, final Throwable exception, final Promise finalProm) { - recordDispatcherListener.failedToSendToDeadLetterSink(record, exception) - .onComplete(finalProm); + recordDispatcherListener.failedToSendToDeadLetterSink(record, exception); + finalProm.complete(); } private static Function, Future> composeSenderAndSinkHandler( diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java new file mode 100644 index 0000000000..38a019319d --- /dev/null +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java @@ -0,0 +1,211 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; + +import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.common.TopicPartition; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.consumer.OffsetAndMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +/** + * This class implements the offset strategy that makes sure that, even unordered, the offset commit is ordered. + */ +public final class OffsetManager implements RecordDispatcherListener { + + private static final Logger logger = LoggerFactory.getLogger(OffsetManager.class); + + private final KafkaConsumer consumer; + + private final Map offsetTrackers; + + private final Consumer onCommit; + + /** + * All args constructor. + * + * @param consumer Kafka consumer. + * @param onCommit Callback invoked when an offset is actually committed + */ + public OffsetManager(final Vertx vertx, + final KafkaConsumer consumer, + final Consumer onCommit, + final long commitIntervalMs) { + Objects.requireNonNull(consumer, "provide consumer"); + + this.consumer = consumer; + this.offsetTrackers = new HashMap<>(); + this.onCommit = onCommit; + + vertx.setPeriodic(commitIntervalMs, l -> this.offsetTrackers.forEach(this::commit)); + } + + /** + * {@inheritDoc} + * + * @return + */ + @Override + public void recordReceived(final KafkaConsumerRecord record) { + final var tp = new TopicPartition(record.topic(), record.partition()); + if (!offsetTrackers.containsKey(tp)) { + // Initialize offset tracker for the given record's topic/partition. + offsetTrackers.put(tp, new OffsetTracker(record.offset())); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void successfullySentToSubscriber(final KafkaConsumerRecord record) { + commit(record); + } + + /** + * {@inheritDoc} + */ + @Override + public void successfullySentToDeadLetterSink(final KafkaConsumerRecord record) { + commit(record); + } + + /** + * {@inheritDoc} + */ + @Override + public void failedToSendToDeadLetterSink(final KafkaConsumerRecord record, final Throwable ex) { + commit(record); + } + + /** + * {@inheritDoc} + */ + @Override + public void recordDiscarded(final KafkaConsumerRecord record) { + commit(record); + } + + private void commit(final KafkaConsumerRecord record) { + this.offsetTrackers + .get(new TopicPartition(record.topic(), record.partition())) + .recordNewOffset(record.offset()); + } + + private synchronized void commit(final TopicPartition topicPartition, final OffsetTracker tracker) { + long newOffset = tracker.offsetToCommit(); + if (newOffset > tracker.getCommitted()) { + // Reset the state + tracker.setCommitted(newOffset); + + logger.debug("Committing offset for {} offset {}", topicPartition, newOffset); + + // Execute the actual commit + consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, ""))) + .onSuccess(ignored -> { + if (onCommit != null) { + onCommit.accept((int) newOffset); + } + }) + .onFailure(cause -> logger.error("failed to commit topic partition {} offset {}", topicPartition, newOffset, cause)) + .mapEmpty(); + } + } + + /** + * This offset tracker keeps track of the committed records. + */ + private static class OffsetTracker { + + // In order to not use a huge amount of memory we cap the BitSet to a _dynamic_ max size governed by this + // threshold. + private static final int RESET_TRACKER_THRESHOLD = 1_000_000; + + // We store `long` offsets in a `BitSet` that is capable of handling `int` elements. + // The BitSet sets a committed bit for an offset `offset` in this way: + // bitSetOffset = offset - initialOffset + private BitSet committedOffsets; + + // InitialOffset represents the offset committed from where committedOffsets BitSet starts, which means that + // the state of committedOffsets[0] is equal to the state of partition[initialOffset]. + private long initialOffset; + + // CommittedOffsets is the actual offset committed to stable storage. + private long committed; + + OffsetTracker(final long initialOffset) { + committedOffsets = new BitSet(); + committed = Math.max(initialOffset, 0); + this.initialOffset = committed; + } + + synchronized void recordNewOffset(final long offset) { + final var bitSetOffset = (int) (offset - initialOffset); + committedOffsets.set(bitSetOffset); + maybeReset(bitSetOffset); + } + + synchronized long offsetToCommit() { + return initialOffset + committedOffsets.nextClearBit((int) (committed - initialOffset)); + } + + synchronized void setCommitted(final long committed) { + this.committed = committed; + } + + synchronized long getCommitted() { + return committed; + } + + private void maybeReset(final int offset) { + if (offset > RESET_TRACKER_THRESHOLD) { + reset(); + } + } + + private void reset() { + // To not grow the BitSet indefinitely we create a new BitSet that starts from the committed offset. + // Since the delivery might be unordered we should copy the state of the current committedOffset BitSet that goes + // from the committed offset to the end of the BitSet. + + final var prevCommittedOffsetsArr = committedOffsets.toLongArray(); + // Calculate the word index in the long array. Long size is 64. + final var relativeOffset = committed - initialOffset; + final var wordOfCommitted = (int) (relativeOffset / 64); + + // Copy from wordOfCommitted to the end: [..., wordOfCommitted, ...] + final var newCommittedOffsetsArr = Arrays.copyOfRange( + prevCommittedOffsetsArr, + wordOfCommitted, + prevCommittedOffsetsArr.length + ); + + // Re-create committedOffset BitSet and reset initialOffset. + this.committedOffsets = BitSet.valueOf(newCommittedOffsetsArr); + this.initialOffset = committed; + } + } +} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManager.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManager.java deleted file mode 100644 index 70cffe626d..0000000000 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManager.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; - -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; -import io.vertx.core.Future; -import io.vertx.kafka.client.common.TopicPartition; -import io.vertx.kafka.client.consumer.KafkaConsumer; -import io.vertx.kafka.client.consumer.KafkaConsumerRecord; -import io.vertx.kafka.client.consumer.OffsetAndMetadata; -import java.util.Map; -import java.util.Objects; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; - -/** - * This class implements the offset strategy for the ordered consumer. - */ -public final class OrderedOffsetManager implements RecordDispatcherListener { - - private static final Logger logger = LoggerFactory - .getLogger(OrderedOffsetManager.class); - - private final KafkaConsumer consumer; - - private final Consumer onCommit; - - /** - * All args constructor. - * - * @param consumer Kafka consumer. - * @param onCommit Callback invoked when an offset is actually committed - */ - public OrderedOffsetManager(final KafkaConsumer consumer, final Consumer onCommit) { - Objects.requireNonNull(consumer, "provide consumer"); - - this.consumer = consumer; - this.onCommit = onCommit; - } - - /** - * {@inheritDoc} - * - * @return - */ - @Override - public Future recordReceived(final KafkaConsumerRecord record) { - return Future.succeededFuture(); - } - - /** - * {@inheritDoc} - */ - @Override - public Future successfullySentToSubscriber(final KafkaConsumerRecord record) { - return commit(record); - } - - /** - * {@inheritDoc} - */ - @Override - public Future successfullySentToDeadLetterSink(final KafkaConsumerRecord record) { - return commit(record); - } - - /** - * {@inheritDoc} - */ - @Override - public Future failedToSendToDeadLetterSink(final KafkaConsumerRecord record, final Throwable ex) { - return Future.succeededFuture(); - } - - /** - * {@inheritDoc} - */ - @Override - public Future recordDiscarded(final KafkaConsumerRecord record) { - return Future.succeededFuture(); - } - - private Future commit(final KafkaConsumerRecord record) { - // Execute the actual commit - return consumer.commit(Map.of( - new TopicPartition(record.topic(), record.partition()), - new OffsetAndMetadata(record.offset() + 1, "")) - ) - .onSuccess(ignored -> { - if (onCommit != null) { - onCommit.accept(1); - } - logger.debug( - "committed {} {} {}", - keyValue("topic", record.topic()), - keyValue("partition", record.partition()), - keyValue("offset", record.offset() + 1) - ); - }) - .onFailure(cause -> - logger.error( - "failed to commit {} {} {}", - keyValue("topic", record.topic()), - keyValue("partition", record.partition()), - keyValue("offset", record.offset() + 1), - cause - ) - ).mapEmpty(); - } - -} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManager.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManager.java deleted file mode 100644 index 7757815e80..0000000000 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManager.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; - -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; -import io.vertx.core.Future; -import io.vertx.kafka.client.common.TopicPartition; -import io.vertx.kafka.client.consumer.KafkaConsumer; -import io.vertx.kafka.client.consumer.KafkaConsumerRecord; -import io.vertx.kafka.client.consumer.OffsetAndMetadata; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class implements the offset strategy that makes sure that, even unordered, the offset commit is ordered. - */ -public final class UnorderedOffsetManager implements RecordDispatcherListener { - - private static final Logger logger = LoggerFactory - .getLogger(UnorderedOffsetManager.class); - - private final KafkaConsumer consumer; - - private final Map offsetTrackers; - - private final Consumer onCommit; - - /** - * All args constructor. - * - * @param consumer Kafka consumer. - * @param onCommit Callback invoked when an offset is actually committed - */ - public UnorderedOffsetManager(final KafkaConsumer consumer, final Consumer onCommit) { - Objects.requireNonNull(consumer, "provide consumer"); - - this.consumer = consumer; - this.offsetTrackers = new HashMap<>(); - this.onCommit = onCommit; - } - - /** - * {@inheritDoc} - * - * @return - */ - @Override - public Future recordReceived(final KafkaConsumerRecord record) { - // un-ordered processing doesn't require pause/resume lifecycle. - - // Because recordReceived is guaranteed to be called in order, - // we use it to set the last seen acked offset. - // TODO If this assumption doesn't work, use this.consumer.committed(new TopicPartition(record.topic(), record.partition())) - this.offsetTrackers.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), - v -> new OffsetTracker(record.offset() - 1)); - return Future.succeededFuture(); - } - - /** - * {@inheritDoc} - */ - @Override - public Future successfullySentToSubscriber(final KafkaConsumerRecord record) { - return commit(record); - } - - /** - * {@inheritDoc} - */ - @Override - public Future successfullySentToDeadLetterSink(final KafkaConsumerRecord record) { - return commit(record); - } - - /** - * {@inheritDoc} - */ - @Override - public Future failedToSendToDeadLetterSink(final KafkaConsumerRecord record, final Throwable ex) { - this.offsetTrackers.get(new TopicPartition(record.topic(), record.partition())) - .recordNewOffset(record.offset()); - return Future.succeededFuture(); - } - - /** - * {@inheritDoc} - */ - @Override - public Future recordDiscarded(final KafkaConsumerRecord record) { - this.offsetTrackers.get(new TopicPartition(record.topic(), record.partition())) - .recordNewOffset(record.offset()); - return Future.succeededFuture(); - } - - private Future commit(final KafkaConsumerRecord record) { - TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); - OffsetTracker tracker = this.offsetTrackers.get(topicPartition); - tracker.recordNewOffset(record.offset()); - - if (tracker.shouldCommit()) { - // Reset the state - long newOffset = tracker.offsetToCommit(); - long uncommittedSize = tracker.uncommittedSize(); - tracker.reset(newOffset); - - // Execute the actual commit - return consumer.commit(Map.of( - topicPartition, - new OffsetAndMetadata(newOffset, "")) - ) - .onSuccess(ignored -> { - if (onCommit != null) { - onCommit.accept((int) uncommittedSize); - } - logger.debug( - "committed for topic partition {} {} offset {}", - record.topic(), - record.partition(), - newOffset - ); - }) - .onFailure(cause -> - logger.error( - "failed to commit for topic partition {} {} offset {}", - record.topic(), - record.partition(), - newOffset, - cause - ) - ).mapEmpty(); - } - return Future.succeededFuture(); - } - - /** - * This offset tracker keeps track of the committed records. - * - *

Implementation details

- * - *

- * For each record, the tracker flip a bit at an index representing the difference between the record's offset and the last acked record offset. - * To figure out if we need to commit or not, we just need to check if every bit of the long, up to the greatest uncommitted offset, is 1. - * In numerical representation this is equal to 2(greatestIndex - lastAcked) - 1. To avoid computing these numbers, we statically initialize - * all masks. - * - *

- * Because a long is only 64 bit, we use an array of longs (blocks) to represent the eventual uncommitted records. - * If blocks are more than one, in order to commit, every block except the last one must have all bits flipped to 1 - * (in numerical representation -1 because of 2-complement representation of long) and the last block should follow the rule above explained. - * Note: in the best and more realistic case the store needs just 1 long, which means that only 64 records are sent unordered. - */ - private static class OffsetTracker { - - private final static int ACKS_GARBAGE_SIZE_THRESHOLD = 16; // Meaning 1024 messages are on hold - private final static long[] MASKS = new long[64]; - - static { - // Initialize MASKS - for (int i = 0; i < 64; i++) { - long mask = 0; - for (int j = 0; j <= i; j++) { - mask |= 1L << j; - } - MASKS[i] = mask; - } - } - - private long lastAcked; - private long[] uncommitted; - - private int greaterBlockIndex; - private int greaterBitIndexInGreaterBlock; - - public OffsetTracker(long initialOffset) { - this.lastAcked = initialOffset; - this.uncommitted = new long[1]; - this.greaterBlockIndex = -1; - this.greaterBitIndexInGreaterBlock = -1; - } - - public void recordNewOffset(long offset) { - long diffWithLastCommittedOffset = offset - this.lastAcked - 1; - int blockIndex = blockIndex(diffWithLastCommittedOffset); - int bitIndex = (int) (diffWithLastCommittedOffset % 64); // That's obviously smaller than a long - - checkAcksArraySize(blockIndex); - - // Let's record this bit and update the greater indexes - this.uncommitted[blockIndex] |= 1L << bitIndex; - if (this.greaterBlockIndex < blockIndex) { - this.greaterBlockIndex = blockIndex; - this.greaterBitIndexInGreaterBlock = bitIndex; - } else if (this.greaterBlockIndex == blockIndex && this.greaterBitIndexInGreaterBlock < bitIndex) { - this.greaterBitIndexInGreaterBlock = bitIndex; - } - } - - public boolean shouldCommit() { - // Let's check if we have all the bits to 1, except the last one - for (int b = 0; b < this.greaterBlockIndex; b++) { - if (this.uncommitted[b] != MASKS[63]) { - return false; - } - } - - return this.uncommitted[this.greaterBlockIndex] == MASKS[this.greaterBitIndexInGreaterBlock]; - } - - public long uncommittedSize() { - return this.greaterBitIndexInGreaterBlock + 1 + (greaterBlockIndex * 64L); - } - - public long offsetToCommit() { - return this.lastAcked + uncommittedSize() + 1; - } - - public void reset(long committed) { - this.lastAcked = committed - 1; - this.greaterBlockIndex = -1; - this.greaterBitIndexInGreaterBlock = -1; - - // Cleanup the acks array or overwrite it, depending on its size - if (this.uncommitted.length > ACKS_GARBAGE_SIZE_THRESHOLD) { - // No need to keep that big array - this.uncommitted = new long[1]; - } else { - Arrays.fill(this.uncommitted, 0L); - } - } - - private int blockIndex(long val) { - return (int) (val >> 6); - } - - private void checkAcksArraySize(int blockIndex) { - if (blockIndex > this.uncommitted.length - 1) { - // Let's make sure we create enough room for more unordered records - this.uncommitted = Arrays.copyOf(this.uncommitted, (blockIndex + 1) * 2); - } - } - - } - -} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java index 6ecc3687bb..ee4b3558ee 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java @@ -26,15 +26,13 @@ import dev.knative.eventing.kafka.broker.dispatcher.ConsumerVerticleFactory; import dev.knative.eventing.kafka.broker.dispatcher.DeliveryOrder; import dev.knative.eventing.kafka.broker.dispatcher.Filter; -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; import dev.knative.eventing.kafka.broker.dispatcher.impl.KafkaResponseHandler; import dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl; import dev.knative.eventing.kafka.broker.dispatcher.impl.WebClientCloudEventSender; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.BaseConsumerVerticle; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OffsetManager; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OrderedConsumerVerticle; -import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OrderedOffsetManager; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.UnorderedConsumerVerticle; -import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.UnorderedOffsetManager; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter; import io.cloudevents.CloudEvent; import io.micrometer.core.instrument.Counter; @@ -52,6 +50,11 @@ import io.vertx.kafka.client.common.tracing.ConsumerTracer; import io.vertx.kafka.client.consumer.KafkaConsumer; import io.vertx.kafka.client.producer.KafkaProducer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.AbstractMap.SimpleImmutableEntry; import java.util.HashMap; import java.util.HashSet; @@ -60,13 +63,8 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; @@ -166,12 +164,14 @@ public AbstractVerticle get(final DataPlaneContract.Resource resource, final Dat new AttributesFilter(egress.getFilter().getAttributesMap()) : Filter.noop(); - final RecordDispatcherImpl recordDispatcher = new RecordDispatcherImpl( + final var commitIntervalMs = Integer.parseInt(String.valueOf(consumerConfigs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG))); + + final var recordDispatcher = new RecordDispatcherImpl( filter, egressSubscriberSender, egressDeadLetterSender, new KafkaResponseHandler(producer, resource.getTopics(0)), - getOffsetManager(deliveryOrder, consumer, eventsSentCounter::increment), + new OffsetManager(vertx, consumer, eventsSentCounter::increment, commitIntervalMs), ConsumerTracer.create( ((VertxInternal) vertx).tracer(), new KafkaClientOptions() @@ -274,14 +274,6 @@ private static boolean hasDeadLetterSink(final EgressConfig egressConfig) { return !(egressConfig == null || egressConfig.getDeadLetter().isEmpty()); } - private static RecordDispatcherListener getOffsetManager(final DeliveryOrder type, final KafkaConsumer consumer, - Consumer commitHandler) { - return switch (type) { - case ORDERED -> new OrderedOffsetManager(consumer, commitHandler); - case UNORDERED -> new UnorderedOffsetManager(consumer, commitHandler); - }; - } - private static AbstractVerticle getConsumerVerticle(final DeliveryOrder type, final BaseConsumerVerticle.Initializer initializer, final Set topics) { diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java index 870d8d55bc..3a58707537 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java @@ -29,11 +29,12 @@ import io.vertx.junit5.VertxTestContext; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; @@ -236,14 +237,6 @@ private static KafkaConsumerRecord record() { } public static RecordDispatcherListener offsetManagerMock() { - final RecordDispatcherListener recordDispatcherListener = mock(RecordDispatcherListener.class); - - when(recordDispatcherListener.recordReceived(any())).thenReturn(Future.succeededFuture()); - when(recordDispatcherListener.recordDiscarded(any())).thenReturn(Future.succeededFuture()); - when(recordDispatcherListener.successfullySentToDeadLetterSink(any())).thenReturn(Future.succeededFuture()); - when(recordDispatcherListener.successfullySentToSubscriber(any())).thenReturn(Future.succeededFuture()); - when(recordDispatcherListener.failedToSendToDeadLetterSink(any(), any())).thenReturn(Future.succeededFuture()); - - return recordDispatcherListener; + return mock(RecordDispatcherListener.class); } } diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java index 1d9321cd85..971ca11b05 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java @@ -19,10 +19,12 @@ import io.cloudevents.CloudEvent; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.kafka.client.consumer.KafkaConsumer; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.OffsetAndMetadata; import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl; + import java.util.Collection; import java.util.Map; import java.util.Set; @@ -30,6 +32,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -45,7 +48,7 @@ public abstract class AbstractOffsetManagerTest { - abstract RecordDispatcherListener createOffsetManager(final KafkaConsumer consumer); + abstract RecordDispatcherListener createOffsetManager(final Vertx vertx, final KafkaConsumer consumer); protected static KafkaConsumerRecord record(String topic, int partition, long offset) { return new KafkaConsumerRecordImpl<>( @@ -60,7 +63,8 @@ protected static KafkaConsumerRecord record(String topic, in } protected MapAssert assertThatOffsetCommitted( - Collection partitionsConsumed, Consumer testExecutor) { + final Collection partitionsConsumed, + final Consumer testExecutor) { return assertThatOffsetCommittedWithFailures(partitionsConsumed, (offsetStrategy, flag) -> testExecutor.accept(offsetStrategy)); } @@ -96,10 +100,17 @@ protected MapAssert assertThatOffsetCommittedWithFailures( .when(vertxConsumer) .commit(any(Map.class)); - testExecutor.accept(createOffsetManager(vertxConsumer), failureFlag); + testExecutor.accept(createOffsetManager(Vertx.vertx(), vertxConsumer), failureFlag); + + try { + Thread.sleep(1000); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + final var committed = mockConsumer.committed(Set.copyOf(partitionsConsumed)); return assertThat( - mockConsumer.committed(Set.copyOf(partitionsConsumed)) + committed .entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().offset())) diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManagerTest.java similarity index 79% rename from data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerTest.java rename to data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManagerTest.java index ce1cfa815e..470f7096eb 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManagerTest.java @@ -18,10 +18,15 @@ import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; import io.cloudevents.CloudEvent; import io.micrometer.core.instrument.Counter; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; import io.vertx.kafka.client.consumer.KafkaConsumer; + import java.util.List; + import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; @@ -30,12 +35,12 @@ import static org.mockito.Mockito.verify; @Execution(value = ExecutionMode.CONCURRENT) -public class UnorderedOffsetManagerTest extends AbstractOffsetManagerTest { +@ExtendWith(VertxExtension.class) +public class OffsetManagerTest extends AbstractOffsetManagerTest { @Override - RecordDispatcherListener createOffsetManager( - KafkaConsumer consumer) { - return new UnorderedOffsetManager(consumer, null); + RecordDispatcherListener createOffsetManager(final Vertx vertx, final KafkaConsumer consumer) { + return new OffsetManager(vertx, consumer, null, 100L); } @Test @@ -50,6 +55,46 @@ public void shouldCommitAfterSendingEventsOrderedOnTheSamePartition() { .containsEntry(new TopicPartition("aaa", 0), 10L); } + @Test + public void shouldCommitAfterSendingEventsOrderedOnTheSamePartitionLongValues() { + assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { + for (long i = Integer.MAX_VALUE - 50; i < ((long) Integer.MAX_VALUE) + 50; i++) { + var rec = record("aaa", 0, i); + offsetStrategy.recordReceived(rec); + offsetStrategy.successfullySentToSubscriber(rec); + } + }) + .containsEntry(new TopicPartition("aaa", 0), Integer.MAX_VALUE + 50L); + } + + @Test + public void shouldCommitAfterSendingEventsOrderedOnTheSamePartitionLongPeriod() { + assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { + for (long i = 0; i < 2_000_051; i++) { + + var rec = record("aaa", 0, i); + offsetStrategy.recordReceived(rec); + offsetStrategy.successfullySentToSubscriber(rec); + } + }) + .containsEntry(new TopicPartition("aaa", 0), 2_000_051L); + } + + @Test + public void shouldNotCommitAfterSendingEventsOrderedOnTheSamePartitionBrokenSequence() { + assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { + // start number is odd number + for (long i = Integer.MAX_VALUE - 50; i < ((long) Integer.MAX_VALUE) + 50; i++) { + var rec = record("aaa", 0, i); + offsetStrategy.recordReceived(rec); + if (i % 2 == 0) { + offsetStrategy.successfullySentToSubscriber(rec); + } + } + }) + .isEmpty(); + } + @Test public void shouldNotCommitAndNotGoOutOfBounds() { assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { @@ -148,7 +193,7 @@ public void shouldNotCommitAfterSendingEventsABitMoreMixedWithAMissingOne() { List.of(5L, 2L, 0L, 7L, 1L, 3L, 4L) .forEach(offset -> offsetStrategy.successfullySentToSubscriber(record("aaa", 0, offset))); }) - .isEmpty(); + .containsEntry(new TopicPartition("aaa", 0), 6L); } @Test @@ -209,10 +254,10 @@ public void shouldContinueToWorkAfterSendingALotOfRecords() { @Test @SuppressWarnings("unchecked") - public void recordReceived() { + public void recordReceived(final Vertx vertx) { final KafkaConsumer consumer = mock(KafkaConsumer.class); final Counter eventsSentCounter = mock(Counter.class); - new UnorderedOffsetManager(consumer, eventsSentCounter::increment).recordReceived(record("aaa", 0, 0)); + new OffsetManager(vertx, consumer, eventsSentCounter::increment, 100L).recordReceived(record("aaa", 0, 0)); shouldNeverCommit(consumer); shouldNeverPause(consumer); @@ -221,12 +266,11 @@ public void recordReceived() { @Test @SuppressWarnings("unchecked") - public void failedToSendToDeadLetterSink() { + public void failedToSendToDeadLetterSink(final Vertx vertx) { final KafkaConsumer consumer = mock(KafkaConsumer.class); final Counter eventsSentCounter = mock(Counter.class); - UnorderedOffsetManager strategy = - new UnorderedOffsetManager(consumer, eventsSentCounter::increment); + OffsetManager strategy = new OffsetManager(vertx, consumer, eventsSentCounter::increment, 100L); strategy.recordReceived(record("aaa", 0, 0)); strategy.failedToSendToDeadLetterSink(record("aaa", 0, 0), null); diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManagerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManagerTest.java deleted file mode 100644 index 543b28b662..0000000000 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManagerTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; - -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; -import io.vertx.kafka.client.consumer.KafkaConsumer; -import java.util.List; -import org.apache.kafka.common.TopicPartition; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.parallel.Execution; -import org.junit.jupiter.api.parallel.ExecutionMode; - -@Execution(value = ExecutionMode.CONCURRENT) -public class OrderedOffsetManagerTest extends AbstractOffsetManagerTest { - - @Override - RecordDispatcherListener createOffsetManager( - KafkaConsumer consumer) { - return new OrderedOffsetManager(consumer, null); - } - - @Test - public void shouldNotCommitAfterRecordReceived() { - assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { - offsetStrategy.recordReceived(record("aaa", 0, 0)); - }).isEmpty(); - } - - @Test - public void shouldNotCommitAfterFailedToSendToDeadLetterSink() { - assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { - offsetStrategy.failedToSendToDeadLetterSink(record("aaa", 0, 0), new IllegalStateException()); - }).isEmpty(); - } - - @Test - public void shouldNotCommitAfterRecordDiscarded() { - assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { - offsetStrategy.recordDiscarded(record("aaa", 0, 0)); - }).isEmpty(); - } - - - @Test - public void shouldCommitAfterSuccessfullySentToSubscriber() { - assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { - offsetStrategy.successfullySentToSubscriber(record("aaa", 0, 0)); - }).containsEntry(new TopicPartition("aaa", 0), 1L); - } - - @Test - public void shouldCommitAfterSuccessfullySentToDeadLetterSink() { - assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { - offsetStrategy.successfullySentToDeadLetterSink(record("aaa", 0, 0)); - }).containsEntry(new TopicPartition("aaa", 0), 1L); - } - -} diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java index c76bd90fcc..b1efd3aca6 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java @@ -40,6 +40,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.ProducerRecord; @@ -72,6 +74,7 @@ public void testUnorderedConsumer(final Vertx vertx) throws Exception { final var producerConfigs = new Properties(); final var consumerConfigs = new Properties(); + consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100L); final var consumerVerticleFactoryMock = new ConsumerVerticleFactoryImplMock( consumerConfigs, diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java index 6213c91bb2..9e9ccc5351 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java @@ -46,21 +46,22 @@ import io.vertx.kafka.client.producer.KafkaProducer; import io.vertx.micrometer.MicrometerMetricsOptions; import io.vertx.micrometer.backends.BackendRegistries; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + import java.io.File; import java.io.IOException; import java.net.URI; -import java.util.Collections; import java.util.Properties; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import static java.lang.String.format; import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; @@ -142,7 +143,7 @@ public static void setUp(final Vertx vertx, final VertxTestContext context) thro */ @Test @Timeout(timeUnit = TimeUnit.MINUTES, value = 1) - public void execute(final Vertx vertx, final VertxTestContext context) throws Exception { + public void execute(final Vertx vertx, final VertxTestContext context) { final var checkpoints = context.checkpoint(3); @@ -308,6 +309,7 @@ private static ConsumerDeployerVerticle setUpDispatcher(final Vertx vertx, final consumerConfigs.put(BOOTSTRAP_SERVERS_CONFIG, format("localhost:%d", KAFKA_PORT)); consumerConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class.getName()); + consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); final var producerConfigs = producerConfigs(); diff --git a/test/config/chaos/chaosduck.yaml b/test/config/chaos/chaosduck.yaml index aeba8d8da2..2f4bfbffda 100644 --- a/test/config/chaos/chaosduck.yaml +++ b/test/config/chaos/chaosduck.yaml @@ -75,7 +75,7 @@ spec: - name: chaosduck # This is the Go import path for the binary that is containerized # and substituted here. - image: ko://knative.dev/eventing-kafka-broker/vendor/knative.dev/pkg/leaderelection/chaosduck + image: ko://knative.dev/pkg/leaderelection/chaosduck args: [ ] diff --git a/test/e2e-common.sh b/test/e2e-common.sh index bc8b850018..295df8c801 100644 --- a/test/e2e-common.sh +++ b/test/e2e-common.sh @@ -23,10 +23,6 @@ readonly EVENTING_CONFIG=${EVENTING_CONFIG:-"./third_party/eventing-latest/"} # Vendored eventing test images. readonly VENDOR_EVENTING_TEST_IMAGES="vendor/knative.dev/eventing/test/test_images/" -readonly CHAOS_CONFIG="test/config/chaos/chaosduck.yaml" -# Vendored pkg test images. -readonly VENDOR_PKG_TEST_IMAGES="vendor/knative.dev/pkg/leaderelection/chaosduck" - export EVENTING_KAFKA_CONTROL_PLANE_ARTIFACT="eventing-kafka-controller.yaml" export EVENTING_KAFKA_BROKER_ARTIFACT="eventing-kafka-broker.yaml" export EVENTING_KAFKA_SINK_ARTIFACT="eventing-kafka-sink.yaml" @@ -69,20 +65,9 @@ function knative_eventing() { # Publish test images. echo ">> Publishing test images from eventing" - # We vendor test image code from eventing, in order to use ko to resolve them into Docker images, the - # path has to be a GOPATH. - sed -i 's@knative.dev/eventing/test/test_images@knative.dev/eventing-kafka-broker/vendor/knative.dev/eventing/test/test_images@g' "${VENDOR_EVENTING_TEST_IMAGES}"*/*.yaml ./test/upload-test-images.sh ${VENDOR_EVENTING_TEST_IMAGES} e2e || fail_test "Error uploading test images" - sed -i 's@knative.dev/eventing-kafka-broker/vendor/knative.dev/eventing/test/test_images@knative.dev/eventing/test/test_images@g' "${VENDOR_EVENTING_TEST_IMAGES}"*/*.yaml - - # Publish test images from pkg. - echo ">> Publishing test images from pkg" - # We vendor test image code from pkg, in order to use ko to resolve them into Docker images, the - # path has to be a GOPATH. - sed -i 's@knative.dev/pkg/leaderelection/chaosduck@knative.dev/eventing-kafka-broker/vendor/knative.dev/pkg/leaderelection/chaosduck@g' "${CHAOS_CONFIG}" - ./test/upload-test-images.sh ${VENDOR_PKG_TEST_IMAGES} e2e || fail_test "Error uploading test images" - sed -i 's@knative.dev/eventing-kafka-broker/vendor/knative.dev/pkg/leaderelection/chaosduck@knative.dev/pkg/leaderelection/chaosduck@g' "${CHAOS_CONFIG}" + echo ">> Publishing test images" ./test/upload-test-images.sh "test/test_images" e2e || fail_test "Error uploading test images" ./test/kafka/kafka_setup.sh || fail_test "Failed to set up Kafka cluster" diff --git a/vendor/k8s.io/code-generator/generate-groups.sh b/vendor/k8s.io/code-generator/generate-groups.sh old mode 100644 new mode 100755 diff --git a/vendor/knative.dev/pkg/hack/generate-knative.sh b/vendor/knative.dev/pkg/hack/generate-knative.sh old mode 100644 new mode 100755