diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/AsyncCloseable.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/AsyncCloseable.java
index 8a40032993..8361748021 100644
--- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/AsyncCloseable.java
+++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/AsyncCloseable.java
@@ -21,7 +21,9 @@
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
+
import java.util.Arrays;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -53,7 +55,10 @@ static AutoCloseable toAutoCloseable(AsyncCloseable closeable) {
}
/**
- * Compose several {@link AsyncCloseable} into a single {@link AsyncCloseable}. One close failure will cause the whole close to fail.
+ * Compose several {@link AsyncCloseable}s into a single {@link AsyncCloseable}.
+ * One close failure will cause the whole close to fail.
+ *
+ * It filters null futures returned by individual {@link AsyncCloseable} on close.
*
* @param closeables the closeables to compose
* @return the composed closeables
@@ -61,7 +66,9 @@ static AutoCloseable toAutoCloseable(AsyncCloseable closeable) {
static AsyncCloseable compose(AsyncCloseable... closeables) {
return () -> CompositeFuture.all(
Arrays.stream(closeables)
+ .filter(Objects::nonNull)
.map(AsyncCloseable::close)
+ .filter(Objects::nonNull)
.collect(Collectors.toList())
).mapEmpty();
}
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java
index ec5d4745bf..7898c15ff5 100644
--- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/RecordDispatcherListener.java
@@ -15,13 +15,13 @@
*/
package dev.knative.eventing.kafka.broker.dispatcher;
-import io.vertx.core.Future;
+import dev.knative.eventing.kafka.broker.core.AsyncCloseable;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
/**
* This class contains hooks for listening events through the {@link dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher} lifecycle.
*/
-public interface RecordDispatcherListener {
+public interface RecordDispatcherListener extends AsyncCloseable {
/**
* The given record has been received.
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java
index acca22162c..10bc93770c 100644
--- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java
@@ -81,7 +81,7 @@ public RecordDispatcherImpl(
this.subscriberSender = composeSenderAndSinkHandler(subscriberSender, responseHandler, "subscriber");
this.dlsSender = composeSenderAndSinkHandler(deadLetterSinkSender, responseHandler, "dead letter sink");
this.recordDispatcherListener = recordDispatcherListener;
- this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender);
+ this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender, recordDispatcherListener);
this.consumerTracer = consumerTracer;
}
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java
index d8f7754ffb..9400427190 100644
--- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/BaseConsumerVerticle.java
@@ -67,11 +67,15 @@ public void start(Promise startPromise) {
public void stop(Promise stopPromise) {
logger.info("Stopping consumer");
- AsyncCloseable.compose(
- this.consumer::close,
- this.recordDispatcher,
- this.closeable
- ).close(stopPromise);
+ final Promise dependenciesClosedPromise = Promise.promise();
+
+ // Close consumer after other objects have been closed.
+ dependenciesClosedPromise.future()
+ .onComplete(r -> this.consumer.close().onComplete(stopPromise));
+
+ AsyncCloseable
+ .compose(this.recordDispatcher, this.closeable)
+ .close(dependenciesClosedPromise);
}
public void setConsumer(KafkaConsumer consumer) {
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java
index 38a019319d..ce6cd36cc5 100644
--- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OffsetManager.java
@@ -16,6 +16,8 @@
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
@@ -30,6 +32,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
/**
* This class implements the offset strategy that makes sure that, even unordered, the offset commit is ordered.
@@ -60,7 +63,7 @@ public OffsetManager(final Vertx vertx,
this.offsetTrackers = new HashMap<>();
this.onCommit = onCommit;
- vertx.setPeriodic(commitIntervalMs, l -> this.offsetTrackers.forEach(this::commit));
+ vertx.setPeriodic(commitIntervalMs, l -> commitAll());
}
/**
@@ -115,7 +118,7 @@ private void commit(final KafkaConsumerRecord, ?> record) {
.recordNewOffset(record.offset());
}
- private synchronized void commit(final TopicPartition topicPartition, final OffsetTracker tracker) {
+ private synchronized Future commit(final TopicPartition topicPartition, final OffsetTracker tracker) {
long newOffset = tracker.offsetToCommit();
if (newOffset > tracker.getCommitted()) {
// Reset the state
@@ -124,7 +127,7 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs
logger.debug("Committing offset for {} offset {}", topicPartition, newOffset);
// Execute the actual commit
- consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, "")))
+ return consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, "")))
.onSuccess(ignored -> {
if (onCommit != null) {
onCommit.accept((int) newOffset);
@@ -133,6 +136,27 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs
.onFailure(cause -> logger.error("failed to commit topic partition {} offset {}", topicPartition, newOffset, cause))
.mapEmpty();
}
+ return null;
+ }
+
+ /**
+ * Commit all tracked offsets by colling commit on every offsetTracker entry.
+ *
+ * @return succeeded or failed future.
+ */
+ private Future commitAll() {
+ return CompositeFuture.all(
+ this.offsetTrackers.entrySet()
+ .stream()
+ .map(e -> commit(e.getKey(), e.getValue()))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList())
+ ).mapEmpty();
+ }
+
+ @Override
+ public Future close() {
+ return commitAll();
}
/**
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java
index 6779e864ed..d204d0336f 100644
--- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java
@@ -111,6 +111,9 @@ public void stop(Promise stopPromise) {
}
void recordsHandler(KafkaConsumerRecords records) {
+ if (records == null) {
+ return;
+ }
// Put records in queues
// I assume the records are ordered per topic-partition
for (int i = 0; i < records.size(); i++) {
diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java
index 971ca11b05..e8136d32cd 100644
--- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java
+++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/AbstractOffsetManagerTest.java
@@ -100,13 +100,15 @@ protected MapAssert assertThatOffsetCommittedWithFailures(
.when(vertxConsumer)
.commit(any(Map.class));
- testExecutor.accept(createOffsetManager(Vertx.vertx(), vertxConsumer), failureFlag);
+ final var offsetManager = createOffsetManager(Vertx.vertx(), vertxConsumer);
+ testExecutor.accept(offsetManager, failureFlag);
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
+ assertThat(offsetManager.close().succeeded()).isTrue();
final var committed = mockConsumer.committed(Set.copyOf(partitionsConsumed));
return assertThat(