Skip to content

Commit

Permalink
Commit offset on consumer close
Browse files Browse the repository at this point in the history
When a consumer is closed, we want to commit the offsets for all
owned partitions.

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
pierDipi committed Nov 11, 2021
1 parent d24bcb7 commit 18ccda2
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -61,7 +62,9 @@ static AutoCloseable toAutoCloseable(AsyncCloseable closeable) {
static AsyncCloseable compose(AsyncCloseable... closeables) {
return () -> CompositeFuture.all(
Arrays.stream(closeables)
.filter(Objects::nonNull)
.map(AsyncCloseable::close)
.filter(Objects::nonNull)
.collect(Collectors.toList())
).mapEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
*/
package dev.knative.eventing.kafka.broker.dispatcher;

import io.vertx.core.Future;
import dev.knative.eventing.kafka.broker.core.AsyncCloseable;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;

/**
* This class contains hooks for listening events through the {@link dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher} lifecycle.
*/
public interface RecordDispatcherListener {
public interface RecordDispatcherListener extends AsyncCloseable {

/**
* The given record has been received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public RecordDispatcherImpl(
this.subscriberSender = composeSenderAndSinkHandler(subscriberSender, responseHandler, "subscriber");
this.dlsSender = composeSenderAndSinkHandler(deadLetterSinkSender, responseHandler, "dead letter sink");
this.recordDispatcherListener = recordDispatcherListener;
this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender);
this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender, recordDispatcherListener);
this.consumerTracer = consumerTracer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ public void start(Promise<Void> startPromise) {
public void stop(Promise<Void> stopPromise) {
logger.info("Stopping consumer");

AsyncCloseable.compose(
this.consumer::close,
this.recordDispatcher,
this.closeable
).close(stopPromise);
final Promise<Void> dependenciesClosedPromise = Promise.promise();

// Close consumer after other objects have been closed.
dependenciesClosedPromise.future()
.onComplete(r -> this.consumer.close(stopPromise));

AsyncCloseable.compose(this.recordDispatcher, this.closeable).close(dependenciesClosedPromise);
}

public void setConsumer(KafkaConsumer<Object, CloudEvent> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
Expand All @@ -30,6 +32,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* This class implements the offset strategy that makes sure that, even unordered, the offset commit is ordered.
Expand Down Expand Up @@ -60,7 +63,7 @@ public OffsetManager(final Vertx vertx,
this.offsetTrackers = new HashMap<>();
this.onCommit = onCommit;

vertx.setPeriodic(commitIntervalMs, l -> this.offsetTrackers.forEach(this::commit));
vertx.setPeriodic(commitIntervalMs, l -> commitAll());
}

/**
Expand Down Expand Up @@ -115,7 +118,7 @@ private void commit(final KafkaConsumerRecord<?, ?> record) {
.recordNewOffset(record.offset());
}

private synchronized void commit(final TopicPartition topicPartition, final OffsetTracker tracker) {
private synchronized Future<Void> commit(final TopicPartition topicPartition, final OffsetTracker tracker) {
long newOffset = tracker.offsetToCommit();
if (newOffset > tracker.getCommitted()) {
// Reset the state
Expand All @@ -124,7 +127,7 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs
logger.debug("Committing offset for {} offset {}", topicPartition, newOffset);

// Execute the actual commit
consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, "")))
return consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, "")))
.onSuccess(ignored -> {
if (onCommit != null) {
onCommit.accept((int) newOffset);
Expand All @@ -133,6 +136,22 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs
.onFailure(cause -> logger.error("failed to commit topic partition {} offset {}", topicPartition, newOffset, cause))
.mapEmpty();
}
return null;
}

private Future<Void> commitAll() {
return CompositeFuture.all(
this.offsetTrackers.entrySet()
.stream()
.map(e -> commit(e.getKey(), e.getValue()))
.filter(Objects::nonNull)
.collect(Collectors.toList())
).mapEmpty();
}

@Override
public Future<Void> close() {
return commitAll();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@ protected MapAssert<TopicPartition, Long> assertThatOffsetCommittedWithFailures(
.when(vertxConsumer)
.commit(any(Map.class));

testExecutor.accept(createOffsetManager(Vertx.vertx(), vertxConsumer), failureFlag);
final var offsetManager = createOffsetManager(Vertx.vertx(), vertxConsumer);
testExecutor.accept(offsetManager, failureFlag);

try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(offsetManager.close().succeeded()).isTrue();

final var committed = mockConsumer.committed(Set.copyOf(partitionsConsumed));
return assertThat(
Expand Down

0 comments on commit 18ccda2

Please sign in to comment.