diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerBenchmark.java index 926d470ba1..241c5bba45 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerBenchmark.java @@ -19,6 +19,7 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.kafka.client.common.PartitionInfo; import io.vertx.kafka.client.common.TopicPartition; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; @@ -27,11 +28,6 @@ import io.vertx.kafka.client.consumer.OffsetAndMetadata; import io.vertx.kafka.client.consumer.OffsetAndTimestamp; import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.openjdk.jmh.annotations.Benchmark; @@ -41,6 +37,12 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.infra.Blackhole; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + public class UnorderedOffsetManagerBenchmark { @State(Scope.Thread) @@ -71,96 +73,70 @@ public void doSetup() { @Benchmark public void benchmarkReverseOrder(RecordsState recordsState, Blackhole blackhole) { - UnorderedOffsetManager offsetManager = new UnorderedOffsetManager(new MockKafkaConsumer(), null); + OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L); int partitions = 100; for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.recordReceived(recordsState.records[partition][0]) - ); + offsetManager.recordReceived(recordsState.records[partition][0]); } for (int offset = 9_999; offset > 0; offset--) { for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.recordReceived(recordsState.records[partition][offset]) - ); - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]) - ); + offsetManager.recordReceived(recordsState.records[partition][offset]); + offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]); } } for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[partition][0]) - ); + offsetManager.successfullySentToSubscriber(recordsState.records[partition][0]); } } @Benchmark public void benchmarkOrdered(RecordsState recordsState, Blackhole blackhole) { - UnorderedOffsetManager offsetManager = new UnorderedOffsetManager(new MockKafkaConsumer(), null); + OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L); int partitions = 100; for (int offset = 0; offset < 10_000; offset++) { for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.recordReceived(recordsState.records[partition][offset]) - ); - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]) - ); + offsetManager.recordReceived(recordsState.records[partition][offset]); + offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]); } } } @Benchmark public void benchmarkRealisticCase(RecordsState recordsState, Blackhole blackhole) { - UnorderedOffsetManager offsetManager = new UnorderedOffsetManager(new MockKafkaConsumer(), null); + OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L); int partitions = 10; for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.recordReceived(recordsState.records[partition][0]) - ); + offsetManager.recordReceived(recordsState.records[partition][0]); } for (int partition = 0; partition < partitions; partition++) { - for (int offset : new int[] {5, 2, 0, 7, 1, 3, 4, 6}) { - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]) - ); + for (int offset : new int[]{5, 2, 0, 7, 1, 3, 4, 6}) { + offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]); } } } @Benchmark public void benchmarkMixedABit(RecordsState recordsState, Blackhole blackhole) { - UnorderedOffsetManager offsetManager = new UnorderedOffsetManager(new MockKafkaConsumer(), null); + OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L); int partitions = 4; for (int partition = 0; partition < partitions; partition++) { - blackhole.consume( - offsetManager.recordReceived(recordsState.records[partition][0]) - ); + offsetManager.recordReceived(recordsState.records[partition][0]); } for (int i = 0; i < 120; i++) { // This will commit in the following order: // 1 0 3 2 5 4 ... - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[2][i % 2 == 0 ? i + 1 : i - 1]) - ); - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[1][i % 2 == 0 ? i + 1 : i - 1]) - ); - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[0][i % 2 == 0 ? i + 1 : i - 1]) - ); - blackhole.consume( - offsetManager.successfullySentToSubscriber(recordsState.records[3][i % 2 == 0 ? i + 1 : i - 1]) - ); + offsetManager.successfullySentToSubscriber(recordsState.records[2][i % 2 == 0 ? i + 1 : i - 1]); + offsetManager.successfullySentToSubscriber(recordsState.records[1][i % 2 == 0 ? i + 1 : i - 1]); + offsetManager.successfullySentToSubscriber(recordsState.records[0][i % 2 == 0 ? i + 1 : i - 1]); + offsetManager.successfullySentToSubscriber(recordsState.records[3][i % 2 == 0 ? i + 1 : i - 1]); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java index f047e6ec74..ec5d4745bf 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java @@ -28,7 +28,7 @@ public interface RecordDispatcherListener { * * @param record record received. */ - Future recordReceived(KafkaConsumerRecord record); + void recordReceived(KafkaConsumerRecord record); /** * The given record cannot be delivered to dead letter sink. @@ -36,26 +36,26 @@ public interface RecordDispatcherListener { * @param record record undeliverable to dead letter sink. * @param ex exception occurred. */ - Future failedToSendToDeadLetterSink(KafkaConsumerRecord record, Throwable ex); + void failedToSendToDeadLetterSink(KafkaConsumerRecord record, Throwable ex); /** * The given event doesn't pass the filter. * * @param record record discarded. */ - Future recordDiscarded(KafkaConsumerRecord record); + void recordDiscarded(KafkaConsumerRecord record); /** * The given record has been successfully sent to subscriber. * * @param record record sent to subscriber. */ - Future successfullySentToSubscriber(KafkaConsumerRecord record); + void successfullySentToSubscriber(KafkaConsumerRecord record); /** * The given record has been successfully sent to dead letter sink. * * @param record record sent to dead letter sink. */ - Future successfullySentToDeadLetterSink(KafkaConsumerRecord record); + void successfullySentToDeadLetterSink(KafkaConsumerRecord record); } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index 0896d038fc..7d059f4618 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -28,8 +28,10 @@ import io.vertx.core.Vertx; import io.vertx.kafka.client.common.tracing.ConsumerTracer; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; + import java.util.Objects; import java.util.function.Function; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,16 +141,13 @@ private void onRecordReceived(final KafkaConsumerRecord reco } } - recordDispatcherListener.recordReceived(record) - .onSuccess(v -> { - // Execute filtering - if (filter.test(record.value())) { - onFilterMatching(record, finalProm); - } else { - onFilterNotMatching(record, finalProm); - } - }) - .onFailure(finalProm::fail); // This should really never happen + recordDispatcherListener.recordReceived(record); + // Execute filtering + if (filter.test(record.value())) { + onFilterMatching(record, finalProm); + } else { + onFilterNotMatching(record, finalProm); + } } private void onFilterMatching(final KafkaConsumerRecord record, final Promise finalProm) { @@ -161,15 +160,15 @@ private void onFilterMatching(final KafkaConsumerRecord reco private void onFilterNotMatching(final KafkaConsumerRecord record, final Promise finalProm) { logDebug("Record doesn't match filtering", record); - recordDispatcherListener.recordDiscarded(record) - .onComplete(finalProm); + recordDispatcherListener.recordDiscarded(record); + finalProm.complete(); } private void onSubscriberSuccess(final KafkaConsumerRecord record, final Promise finalProm) { logDebug("Successfully sent event to subscriber", record); - recordDispatcherListener.successfullySentToSubscriber(record) - .onComplete(finalProm); + recordDispatcherListener.successfullySentToSubscriber(record); + finalProm.complete(); } private void onSubscriberFailure(final KafkaConsumerRecord record, @@ -182,15 +181,15 @@ private void onSubscriberFailure(final KafkaConsumerRecord r private void onDeadLetterSinkSuccess(final KafkaConsumerRecord record, final Promise finalProm) { logDebug("Successfully sent event to the dead letter sink", record); - recordDispatcherListener.successfullySentToDeadLetterSink(record) - .onComplete(finalProm); + recordDispatcherListener.successfullySentToDeadLetterSink(record); + finalProm.complete(); } private void onDeadLetterSinkFailure(final KafkaConsumerRecord record, final Throwable exception, final Promise finalProm) { - recordDispatcherListener.failedToSendToDeadLetterSink(record, exception) - .onComplete(finalProm); + recordDispatcherListener.failedToSendToDeadLetterSink(record, exception); + finalProm.complete(); } private static Function, Future> composeSenderAndSinkHandler( diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java new file mode 100644 index 0000000000..cd99b873c0 --- /dev/null +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java @@ -0,0 +1,169 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; + +import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.common.TopicPartition; +import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.consumer.OffsetAndMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.BitSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +/** + * This class implements the offset strategy that makes sure that, even unordered, the offset commit is ordered. + */ +public final class OffsetManager implements RecordDispatcherListener { + + private static final Logger logger = LoggerFactory.getLogger(OffsetManager.class); + + private final KafkaConsumer consumer; + + private final Map offsetTrackers; + + private final Consumer onCommit; + + /** + * All args constructor. + * + * @param consumer Kafka consumer. + * @param onCommit Callback invoked when an offset is actually committed + */ + public OffsetManager(final Vertx vertx, + final KafkaConsumer consumer, + final Consumer onCommit, + final long commitIntervalMs) { + Objects.requireNonNull(consumer, "provide consumer"); + + this.consumer = consumer; + this.offsetTrackers = new HashMap<>(); + this.onCommit = onCommit; + + vertx.setPeriodic(commitIntervalMs, l -> this.offsetTrackers.forEach(this::commit)); + } + + /** + * {@inheritDoc} + * + * @return + */ + @Override + public void recordReceived(final KafkaConsumerRecord record) { + final var tp = new TopicPartition(record.topic(), record.partition()); + if (!offsetTrackers.containsKey(tp)) { + // Initialize offset tracker for the given record's topic/partition. + offsetTrackers.put(tp, new OffsetTracker(record.offset() - 1)); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void successfullySentToSubscriber(final KafkaConsumerRecord record) { + commit(record); + } + + /** + * {@inheritDoc} + */ + @Override + public void successfullySentToDeadLetterSink(final KafkaConsumerRecord record) { + commit(record); + } + + /** + * {@inheritDoc} + */ + @Override + public void failedToSendToDeadLetterSink(final KafkaConsumerRecord record, final Throwable ex) { + commit(record); + } + + /** + * {@inheritDoc} + */ + @Override + public void recordDiscarded(final KafkaConsumerRecord record) { + commit(record); + } + + private void commit(final KafkaConsumerRecord record) { + this.offsetTrackers + .get(new TopicPartition(record.topic(), record.partition())) + .recordNewOffset(record.offset()); + } + + private synchronized void commit(final TopicPartition topicPartition, final OffsetTracker tracker) { + long newOffset = tracker.offsetToCommit(); + if (newOffset > tracker.getCommitted()) { + // Reset the state + tracker.setCommitted(newOffset); + + logger.debug("Committing offset for {} offset {}", topicPartition, newOffset); + + // Execute the actual commit + consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, ""))) + .onSuccess(ignored -> { + if (onCommit != null) { + onCommit.accept((int) newOffset); + } + }) + .onFailure(cause -> logger.error("failed to commit topic partition {} offset {}", topicPartition, newOffset, cause)) + .mapEmpty(); + } + } + + /** + * This offset tracker keeps track of the committed records. + */ + private static class OffsetTracker { + + private final BitSet committedOffsets; + + private final long initialOffset; + private long committed; + + public OffsetTracker(long initialOffset) { + committedOffsets = new BitSet(); + committed = Math.max(initialOffset + 1, 0); + this.initialOffset = committed; + } + + public void recordNewOffset(long offset) { + committedOffsets.set((int) (offset - initialOffset)); + } + + public long offsetToCommit() { + return initialOffset + committedOffsets.nextClearBit((int) (committed - initialOffset)); + } + + public void setCommitted(final long committed) { + this.committed = committed; + } + + public long getCommitted() { + return committed; + } + } +} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManager.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManager.java deleted file mode 100644 index 70cffe626d..0000000000 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManager.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; - -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; -import io.vertx.core.Future; -import io.vertx.kafka.client.common.TopicPartition; -import io.vertx.kafka.client.consumer.KafkaConsumer; -import io.vertx.kafka.client.consumer.KafkaConsumerRecord; -import io.vertx.kafka.client.consumer.OffsetAndMetadata; -import java.util.Map; -import java.util.Objects; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; - -/** - * This class implements the offset strategy for the ordered consumer. - */ -public final class OrderedOffsetManager implements RecordDispatcherListener { - - private static final Logger logger = LoggerFactory - .getLogger(OrderedOffsetManager.class); - - private final KafkaConsumer consumer; - - private final Consumer onCommit; - - /** - * All args constructor. - * - * @param consumer Kafka consumer. - * @param onCommit Callback invoked when an offset is actually committed - */ - public OrderedOffsetManager(final KafkaConsumer consumer, final Consumer onCommit) { - Objects.requireNonNull(consumer, "provide consumer"); - - this.consumer = consumer; - this.onCommit = onCommit; - } - - /** - * {@inheritDoc} - * - * @return - */ - @Override - public Future recordReceived(final KafkaConsumerRecord record) { - return Future.succeededFuture(); - } - - /** - * {@inheritDoc} - */ - @Override - public Future successfullySentToSubscriber(final KafkaConsumerRecord record) { - return commit(record); - } - - /** - * {@inheritDoc} - */ - @Override - public Future successfullySentToDeadLetterSink(final KafkaConsumerRecord record) { - return commit(record); - } - - /** - * {@inheritDoc} - */ - @Override - public Future failedToSendToDeadLetterSink(final KafkaConsumerRecord record, final Throwable ex) { - return Future.succeededFuture(); - } - - /** - * {@inheritDoc} - */ - @Override - public Future recordDiscarded(final KafkaConsumerRecord record) { - return Future.succeededFuture(); - } - - private Future commit(final KafkaConsumerRecord record) { - // Execute the actual commit - return consumer.commit(Map.of( - new TopicPartition(record.topic(), record.partition()), - new OffsetAndMetadata(record.offset() + 1, "")) - ) - .onSuccess(ignored -> { - if (onCommit != null) { - onCommit.accept(1); - } - logger.debug( - "committed {} {} {}", - keyValue("topic", record.topic()), - keyValue("partition", record.partition()), - keyValue("offset", record.offset() + 1) - ); - }) - .onFailure(cause -> - logger.error( - "failed to commit {} {} {}", - keyValue("topic", record.topic()), - keyValue("partition", record.partition()), - keyValue("offset", record.offset() + 1), - cause - ) - ).mapEmpty(); - } - -} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManager.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManager.java deleted file mode 100644 index 7757815e80..0000000000 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManager.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; - -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; -import io.vertx.core.Future; -import io.vertx.kafka.client.common.TopicPartition; -import io.vertx.kafka.client.consumer.KafkaConsumer; -import io.vertx.kafka.client.consumer.KafkaConsumerRecord; -import io.vertx.kafka.client.consumer.OffsetAndMetadata; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class implements the offset strategy that makes sure that, even unordered, the offset commit is ordered. - */ -public final class UnorderedOffsetManager implements RecordDispatcherListener { - - private static final Logger logger = LoggerFactory - .getLogger(UnorderedOffsetManager.class); - - private final KafkaConsumer consumer; - - private final Map offsetTrackers; - - private final Consumer onCommit; - - /** - * All args constructor. - * - * @param consumer Kafka consumer. - * @param onCommit Callback invoked when an offset is actually committed - */ - public UnorderedOffsetManager(final KafkaConsumer consumer, final Consumer onCommit) { - Objects.requireNonNull(consumer, "provide consumer"); - - this.consumer = consumer; - this.offsetTrackers = new HashMap<>(); - this.onCommit = onCommit; - } - - /** - * {@inheritDoc} - * - * @return - */ - @Override - public Future recordReceived(final KafkaConsumerRecord record) { - // un-ordered processing doesn't require pause/resume lifecycle. - - // Because recordReceived is guaranteed to be called in order, - // we use it to set the last seen acked offset. - // TODO If this assumption doesn't work, use this.consumer.committed(new TopicPartition(record.topic(), record.partition())) - this.offsetTrackers.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), - v -> new OffsetTracker(record.offset() - 1)); - return Future.succeededFuture(); - } - - /** - * {@inheritDoc} - */ - @Override - public Future successfullySentToSubscriber(final KafkaConsumerRecord record) { - return commit(record); - } - - /** - * {@inheritDoc} - */ - @Override - public Future successfullySentToDeadLetterSink(final KafkaConsumerRecord record) { - return commit(record); - } - - /** - * {@inheritDoc} - */ - @Override - public Future failedToSendToDeadLetterSink(final KafkaConsumerRecord record, final Throwable ex) { - this.offsetTrackers.get(new TopicPartition(record.topic(), record.partition())) - .recordNewOffset(record.offset()); - return Future.succeededFuture(); - } - - /** - * {@inheritDoc} - */ - @Override - public Future recordDiscarded(final KafkaConsumerRecord record) { - this.offsetTrackers.get(new TopicPartition(record.topic(), record.partition())) - .recordNewOffset(record.offset()); - return Future.succeededFuture(); - } - - private Future commit(final KafkaConsumerRecord record) { - TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); - OffsetTracker tracker = this.offsetTrackers.get(topicPartition); - tracker.recordNewOffset(record.offset()); - - if (tracker.shouldCommit()) { - // Reset the state - long newOffset = tracker.offsetToCommit(); - long uncommittedSize = tracker.uncommittedSize(); - tracker.reset(newOffset); - - // Execute the actual commit - return consumer.commit(Map.of( - topicPartition, - new OffsetAndMetadata(newOffset, "")) - ) - .onSuccess(ignored -> { - if (onCommit != null) { - onCommit.accept((int) uncommittedSize); - } - logger.debug( - "committed for topic partition {} {} offset {}", - record.topic(), - record.partition(), - newOffset - ); - }) - .onFailure(cause -> - logger.error( - "failed to commit for topic partition {} {} offset {}", - record.topic(), - record.partition(), - newOffset, - cause - ) - ).mapEmpty(); - } - return Future.succeededFuture(); - } - - /** - * This offset tracker keeps track of the committed records. - * - *

Implementation details

- * - *

- * For each record, the tracker flip a bit at an index representing the difference between the record's offset and the last acked record offset. - * To figure out if we need to commit or not, we just need to check if every bit of the long, up to the greatest uncommitted offset, is 1. - * In numerical representation this is equal to 2(greatestIndex - lastAcked) - 1. To avoid computing these numbers, we statically initialize - * all masks. - * - *

- * Because a long is only 64 bit, we use an array of longs (blocks) to represent the eventual uncommitted records. - * If blocks are more than one, in order to commit, every block except the last one must have all bits flipped to 1 - * (in numerical representation -1 because of 2-complement representation of long) and the last block should follow the rule above explained. - * Note: in the best and more realistic case the store needs just 1 long, which means that only 64 records are sent unordered. - */ - private static class OffsetTracker { - - private final static int ACKS_GARBAGE_SIZE_THRESHOLD = 16; // Meaning 1024 messages are on hold - private final static long[] MASKS = new long[64]; - - static { - // Initialize MASKS - for (int i = 0; i < 64; i++) { - long mask = 0; - for (int j = 0; j <= i; j++) { - mask |= 1L << j; - } - MASKS[i] = mask; - } - } - - private long lastAcked; - private long[] uncommitted; - - private int greaterBlockIndex; - private int greaterBitIndexInGreaterBlock; - - public OffsetTracker(long initialOffset) { - this.lastAcked = initialOffset; - this.uncommitted = new long[1]; - this.greaterBlockIndex = -1; - this.greaterBitIndexInGreaterBlock = -1; - } - - public void recordNewOffset(long offset) { - long diffWithLastCommittedOffset = offset - this.lastAcked - 1; - int blockIndex = blockIndex(diffWithLastCommittedOffset); - int bitIndex = (int) (diffWithLastCommittedOffset % 64); // That's obviously smaller than a long - - checkAcksArraySize(blockIndex); - - // Let's record this bit and update the greater indexes - this.uncommitted[blockIndex] |= 1L << bitIndex; - if (this.greaterBlockIndex < blockIndex) { - this.greaterBlockIndex = blockIndex; - this.greaterBitIndexInGreaterBlock = bitIndex; - } else if (this.greaterBlockIndex == blockIndex && this.greaterBitIndexInGreaterBlock < bitIndex) { - this.greaterBitIndexInGreaterBlock = bitIndex; - } - } - - public boolean shouldCommit() { - // Let's check if we have all the bits to 1, except the last one - for (int b = 0; b < this.greaterBlockIndex; b++) { - if (this.uncommitted[b] != MASKS[63]) { - return false; - } - } - - return this.uncommitted[this.greaterBlockIndex] == MASKS[this.greaterBitIndexInGreaterBlock]; - } - - public long uncommittedSize() { - return this.greaterBitIndexInGreaterBlock + 1 + (greaterBlockIndex * 64L); - } - - public long offsetToCommit() { - return this.lastAcked + uncommittedSize() + 1; - } - - public void reset(long committed) { - this.lastAcked = committed - 1; - this.greaterBlockIndex = -1; - this.greaterBitIndexInGreaterBlock = -1; - - // Cleanup the acks array or overwrite it, depending on its size - if (this.uncommitted.length > ACKS_GARBAGE_SIZE_THRESHOLD) { - // No need to keep that big array - this.uncommitted = new long[1]; - } else { - Arrays.fill(this.uncommitted, 0L); - } - } - - private int blockIndex(long val) { - return (int) (val >> 6); - } - - private void checkAcksArraySize(int blockIndex) { - if (blockIndex > this.uncommitted.length - 1) { - // Let's make sure we create enough room for more unordered records - this.uncommitted = Arrays.copyOf(this.uncommitted, (blockIndex + 1) * 2); - } - } - - } - -} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java index ee17fceddb..af623c6f48 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleFactoryImpl.java @@ -20,14 +20,11 @@ import dev.knative.eventing.kafka.broker.core.AsyncCloseable; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; import dev.knative.eventing.kafka.broker.core.security.AuthProvider; -import dev.knative.eventing.kafka.broker.core.security.Credentials; import dev.knative.eventing.kafka.broker.core.security.KafkaClientsAuth; -import dev.knative.eventing.kafka.broker.core.security.PlaintextCredentials; import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender; import dev.knative.eventing.kafka.broker.dispatcher.ConsumerVerticleFactory; import dev.knative.eventing.kafka.broker.dispatcher.DeliveryOrder; import dev.knative.eventing.kafka.broker.dispatcher.Filter; -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler; import dev.knative.eventing.kafka.broker.dispatcher.impl.KafkaResponseHandler; import dev.knative.eventing.kafka.broker.dispatcher.impl.NoopResponseHandler; @@ -37,10 +34,9 @@ import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventOverridesMutator; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OffsetManager; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OrderedConsumerVerticle; -import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OrderedOffsetManager; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.UnorderedConsumerVerticle; -import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.UnorderedOffsetManager; import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter; import dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender; import io.cloudevents.CloudEvent; @@ -49,7 +45,6 @@ import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.AbstractVerticle; -import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.impl.VertxInternal; import io.vertx.core.tracing.TracingPolicy; @@ -72,7 +67,6 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -178,6 +172,7 @@ public AbstractVerticle get(final DataPlaneContract.Resource resource, final Dat Filter.noop(); final var responseHandler = getNoopResponseHandlerOrDefault(egress, () -> getKafkaResponseHandler(vertx, producerConfigs, resource)); + final var commitIntervalMs = Integer.parseInt(String.valueOf(consumerConfigs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG))); final var recordDispatcher = new RecordDispatcherMutatorChain( new RecordDispatcherImpl( @@ -185,7 +180,7 @@ public AbstractVerticle get(final DataPlaneContract.Resource resource, final Dat egressSubscriberSender, egressDeadLetterSender, responseHandler, - getOffsetManager(deliveryOrder, consumer, eventsSentCounter::increment), + new OffsetManager(vertx, consumer, eventsSentCounter::increment, commitIntervalMs), ConsumerTracer.create( ((VertxInternal) vertx).tracer(), new KafkaClientOptions() @@ -305,14 +300,6 @@ private static boolean hasDeadLetterSink(final EgressConfig egressConfig) { return !(egressConfig == null || egressConfig.getDeadLetter().isEmpty()); } - private static RecordDispatcherListener getOffsetManager(final DeliveryOrder type, final KafkaConsumer consumer, - Consumer commitHandler) { - return switch (type) { - case ORDERED -> new OrderedOffsetManager(consumer, commitHandler); - case UNORDERED -> new UnorderedOffsetManager(consumer, commitHandler); - }; - } - private static AbstractVerticle getConsumerVerticle(final DeliveryOrder type, final BaseConsumerVerticle.Initializer initializer, final Set topics) { diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java index 213ca00386..315f3aad53 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java @@ -29,11 +29,12 @@ import io.vertx.junit5.VertxTestContext; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; @@ -236,14 +237,6 @@ private static KafkaConsumerRecord record() { } public static RecordDispatcherListener offsetManagerMock() { - final RecordDispatcherListener recordDispatcherListener = mock(RecordDispatcherListener.class); - - when(recordDispatcherListener.recordReceived(any())).thenReturn(Future.succeededFuture()); - when(recordDispatcherListener.recordDiscarded(any())).thenReturn(Future.succeededFuture()); - when(recordDispatcherListener.successfullySentToDeadLetterSink(any())).thenReturn(Future.succeededFuture()); - when(recordDispatcherListener.successfullySentToSubscriber(any())).thenReturn(Future.succeededFuture()); - when(recordDispatcherListener.failedToSendToDeadLetterSink(any(), any())).thenReturn(Future.succeededFuture()); - - return recordDispatcherListener; + return mock(RecordDispatcherListener.class); } } diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java index 1d9321cd85..971ca11b05 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java @@ -19,10 +19,12 @@ import io.cloudevents.CloudEvent; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.kafka.client.consumer.KafkaConsumer; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.consumer.OffsetAndMetadata; import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl; + import java.util.Collection; import java.util.Map; import java.util.Set; @@ -30,6 +32,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -45,7 +48,7 @@ public abstract class AbstractOffsetManagerTest { - abstract RecordDispatcherListener createOffsetManager(final KafkaConsumer consumer); + abstract RecordDispatcherListener createOffsetManager(final Vertx vertx, final KafkaConsumer consumer); protected static KafkaConsumerRecord record(String topic, int partition, long offset) { return new KafkaConsumerRecordImpl<>( @@ -60,7 +63,8 @@ protected static KafkaConsumerRecord record(String topic, in } protected MapAssert assertThatOffsetCommitted( - Collection partitionsConsumed, Consumer testExecutor) { + final Collection partitionsConsumed, + final Consumer testExecutor) { return assertThatOffsetCommittedWithFailures(partitionsConsumed, (offsetStrategy, flag) -> testExecutor.accept(offsetStrategy)); } @@ -96,10 +100,17 @@ protected MapAssert assertThatOffsetCommittedWithFailures( .when(vertxConsumer) .commit(any(Map.class)); - testExecutor.accept(createOffsetManager(vertxConsumer), failureFlag); + testExecutor.accept(createOffsetManager(Vertx.vertx(), vertxConsumer), failureFlag); + + try { + Thread.sleep(1000); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + final var committed = mockConsumer.committed(Set.copyOf(partitionsConsumed)); return assertThat( - mockConsumer.committed(Set.copyOf(partitionsConsumed)) + committed .entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().offset())) diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManagerTest.java similarity index 87% rename from data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerTest.java rename to data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManagerTest.java index ce1cfa815e..25b5f8abda 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedOffsetManagerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManagerTest.java @@ -18,10 +18,15 @@ import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; import io.cloudevents.CloudEvent; import io.micrometer.core.instrument.Counter; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; import io.vertx.kafka.client.consumer.KafkaConsumer; + import java.util.List; + import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; @@ -30,12 +35,12 @@ import static org.mockito.Mockito.verify; @Execution(value = ExecutionMode.CONCURRENT) -public class UnorderedOffsetManagerTest extends AbstractOffsetManagerTest { +@ExtendWith(VertxExtension.class) +public class OffsetManagerTest extends AbstractOffsetManagerTest { @Override - RecordDispatcherListener createOffsetManager( - KafkaConsumer consumer) { - return new UnorderedOffsetManager(consumer, null); + RecordDispatcherListener createOffsetManager(final Vertx vertx, final KafkaConsumer consumer) { + return new OffsetManager(vertx, consumer, null, 100L); } @Test @@ -50,6 +55,18 @@ public void shouldCommitAfterSendingEventsOrderedOnTheSamePartition() { .containsEntry(new TopicPartition("aaa", 0), 10L); } + @Test + public void shouldCommitAfterSendingEventsOrderedOnTheSamePartitionReset() { + assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { + for (long i = Integer.MAX_VALUE - 50; i < ((long) Integer.MAX_VALUE) + 50; i++) { + var rec = record("aaa", 0, i); + offsetStrategy.recordReceived(rec); + offsetStrategy.successfullySentToSubscriber(rec); + } + }) + .containsEntry(new TopicPartition("aaa", 0), Integer.MAX_VALUE+50L); + } + @Test public void shouldNotCommitAndNotGoOutOfBounds() { assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { @@ -148,7 +165,7 @@ public void shouldNotCommitAfterSendingEventsABitMoreMixedWithAMissingOne() { List.of(5L, 2L, 0L, 7L, 1L, 3L, 4L) .forEach(offset -> offsetStrategy.successfullySentToSubscriber(record("aaa", 0, offset))); }) - .isEmpty(); + .containsEntry(new TopicPartition("aaa", 0), 6L); } @Test @@ -209,10 +226,10 @@ public void shouldContinueToWorkAfterSendingALotOfRecords() { @Test @SuppressWarnings("unchecked") - public void recordReceived() { + public void recordReceived(final Vertx vertx) { final KafkaConsumer consumer = mock(KafkaConsumer.class); final Counter eventsSentCounter = mock(Counter.class); - new UnorderedOffsetManager(consumer, eventsSentCounter::increment).recordReceived(record("aaa", 0, 0)); + new OffsetManager(vertx, consumer, eventsSentCounter::increment, 100L).recordReceived(record("aaa", 0, 0)); shouldNeverCommit(consumer); shouldNeverPause(consumer); @@ -221,12 +238,11 @@ public void recordReceived() { @Test @SuppressWarnings("unchecked") - public void failedToSendToDeadLetterSink() { + public void failedToSendToDeadLetterSink(final Vertx vertx) { final KafkaConsumer consumer = mock(KafkaConsumer.class); final Counter eventsSentCounter = mock(Counter.class); - UnorderedOffsetManager strategy = - new UnorderedOffsetManager(consumer, eventsSentCounter::increment); + OffsetManager strategy = new OffsetManager(vertx, consumer, eventsSentCounter::increment, 100L); strategy.recordReceived(record("aaa", 0, 0)); strategy.failedToSendToDeadLetterSink(record("aaa", 0, 0), null); diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManagerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManagerTest.java deleted file mode 100644 index 543b28b662..0000000000 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedOffsetManagerTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; - -import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; -import io.vertx.kafka.client.consumer.KafkaConsumer; -import java.util.List; -import org.apache.kafka.common.TopicPartition; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.parallel.Execution; -import org.junit.jupiter.api.parallel.ExecutionMode; - -@Execution(value = ExecutionMode.CONCURRENT) -public class OrderedOffsetManagerTest extends AbstractOffsetManagerTest { - - @Override - RecordDispatcherListener createOffsetManager( - KafkaConsumer consumer) { - return new OrderedOffsetManager(consumer, null); - } - - @Test - public void shouldNotCommitAfterRecordReceived() { - assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { - offsetStrategy.recordReceived(record("aaa", 0, 0)); - }).isEmpty(); - } - - @Test - public void shouldNotCommitAfterFailedToSendToDeadLetterSink() { - assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { - offsetStrategy.failedToSendToDeadLetterSink(record("aaa", 0, 0), new IllegalStateException()); - }).isEmpty(); - } - - @Test - public void shouldNotCommitAfterRecordDiscarded() { - assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { - offsetStrategy.recordDiscarded(record("aaa", 0, 0)); - }).isEmpty(); - } - - - @Test - public void shouldCommitAfterSuccessfullySentToSubscriber() { - assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { - offsetStrategy.successfullySentToSubscriber(record("aaa", 0, 0)); - }).containsEntry(new TopicPartition("aaa", 0), 1L); - } - - @Test - public void shouldCommitAfterSuccessfullySentToDeadLetterSink() { - assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> { - offsetStrategy.successfullySentToDeadLetterSink(record("aaa", 0, 0)); - }).containsEntry(new TopicPartition("aaa", 0), 1L); - } - -} diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java index c76bd90fcc..b1efd3aca6 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java @@ -40,6 +40,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.ProducerRecord; @@ -72,6 +74,7 @@ public void testUnorderedConsumer(final Vertx vertx) throws Exception { final var producerConfigs = new Properties(); final var consumerConfigs = new Properties(); + consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100L); final var consumerVerticleFactoryMock = new ConsumerVerticleFactoryImplMock( consumerConfigs, diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java index b33f00d3a8..a1e9babe15 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java @@ -47,6 +47,12 @@ import io.vertx.kafka.client.producer.KafkaProducer; import io.vertx.micrometer.MicrometerMetricsOptions; import io.vertx.micrometer.backends.BackendRegistries; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import java.io.File; import java.io.IOException; @@ -57,12 +63,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - import static java.lang.String.format; import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; @@ -143,7 +143,7 @@ public static void setUp(final Vertx vertx, final VertxTestContext context) thro */ @Test @Timeout(timeUnit = TimeUnit.MINUTES, value = 1) - public void execute(final Vertx vertx, final VertxTestContext context) throws Exception { + public void execute(final Vertx vertx, final VertxTestContext context) { final var checkpoints = context.checkpoint(3); @@ -309,6 +309,7 @@ private static ConsumerDeployerVerticle setUpDispatcher(final Vertx vertx, final consumerConfigs.put(BOOTSTRAP_SERVERS_CONFIG, format("localhost:%d", KAFKA_PORT)); consumerConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, KeyDeserializer.class.getName()); consumerConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class.getName()); + consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); final var producerConfigs = producerConfigs();