Skip to content

Commit

Permalink
Commit offset on consumer close (knative-extensions#1463) (knative-ex…
Browse files Browse the repository at this point in the history
…tensions#1478)

* Commit offset on consumer close

When a consumer is closed, we want to commit the offsets for all
owned partitions.

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>

* Comments

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
pierDipi authored and matzew committed Nov 12, 2021
1 parent 96167f3 commit db4f0c8
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -53,15 +55,20 @@ static AutoCloseable toAutoCloseable(AsyncCloseable closeable) {
}

/**
* Compose several {@link AsyncCloseable} into a single {@link AsyncCloseable}. One close failure will cause the whole close to fail.
* Compose several {@link AsyncCloseable}s into a single {@link AsyncCloseable}.
* One close failure will cause the whole close to fail.
* <p>
* It filters null futures returned by individual {@link AsyncCloseable} on close.
*
* @param closeables the closeables to compose
* @return the composed closeables
*/
static AsyncCloseable compose(AsyncCloseable... closeables) {
return () -> CompositeFuture.all(
Arrays.stream(closeables)
.filter(Objects::nonNull)
.map(AsyncCloseable::close)
.filter(Objects::nonNull)
.collect(Collectors.toList())
).mapEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
*/
package dev.knative.eventing.kafka.broker.dispatcher;

import io.vertx.core.Future;
import dev.knative.eventing.kafka.broker.core.AsyncCloseable;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;

/**
* This class contains hooks for listening events through the {@link dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher} lifecycle.
*/
public interface RecordDispatcherListener {
public interface RecordDispatcherListener extends AsyncCloseable {

/**
* The given record has been received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public RecordDispatcherImpl(
this.subscriberSender = composeSenderAndSinkHandler(subscriberSender, responseHandler, "subscriber");
this.dlsSender = composeSenderAndSinkHandler(deadLetterSinkSender, responseHandler, "dead letter sink");
this.recordDispatcherListener = recordDispatcherListener;
this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender);
this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender, recordDispatcherListener);
this.consumerTracer = consumerTracer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,15 @@ public void start(Promise<Void> startPromise) {
public void stop(Promise<Void> stopPromise) {
logger.info("Stopping consumer");

AsyncCloseable.compose(
this.consumer::close,
this.recordDispatcher,
this.closeable
).close(stopPromise);
final Promise<Void> dependenciesClosedPromise = Promise.promise();

// Close consumer after other objects have been closed.
dependenciesClosedPromise.future()
.onComplete(r -> this.consumer.close().onComplete(stopPromise));

AsyncCloseable
.compose(this.recordDispatcher, this.closeable)
.close(dependenciesClosedPromise);
}

public void setConsumer(KafkaConsumer<String, CloudEvent> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
Expand All @@ -30,6 +32,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* This class implements the offset strategy that makes sure that, even unordered, the offset commit is ordered.
Expand Down Expand Up @@ -60,7 +63,7 @@ public OffsetManager(final Vertx vertx,
this.offsetTrackers = new HashMap<>();
this.onCommit = onCommit;

vertx.setPeriodic(commitIntervalMs, l -> this.offsetTrackers.forEach(this::commit));
vertx.setPeriodic(commitIntervalMs, l -> commitAll());
}

/**
Expand Down Expand Up @@ -115,7 +118,7 @@ private void commit(final KafkaConsumerRecord<?, ?> record) {
.recordNewOffset(record.offset());
}

private synchronized void commit(final TopicPartition topicPartition, final OffsetTracker tracker) {
private synchronized Future<Void> commit(final TopicPartition topicPartition, final OffsetTracker tracker) {
long newOffset = tracker.offsetToCommit();
if (newOffset > tracker.getCommitted()) {
// Reset the state
Expand All @@ -124,7 +127,7 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs
logger.debug("Committing offset for {} offset {}", topicPartition, newOffset);

// Execute the actual commit
consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, "")))
return consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, "")))
.onSuccess(ignored -> {
if (onCommit != null) {
onCommit.accept((int) newOffset);
Expand All @@ -133,6 +136,27 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs
.onFailure(cause -> logger.error("failed to commit topic partition {} offset {}", topicPartition, newOffset, cause))
.mapEmpty();
}
return null;
}

/**
* Commit all tracked offsets by colling commit on every offsetTracker entry.
*
* @return succeeded or failed future.
*/
private Future<Void> commitAll() {
return CompositeFuture.all(
this.offsetTrackers.entrySet()
.stream()
.map(e -> commit(e.getKey(), e.getValue()))
.filter(Objects::nonNull)
.collect(Collectors.toList())
).mapEmpty();
}

@Override
public Future<Void> close() {
return commitAll();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public void stop(Promise<Void> stopPromise) {
}

void recordsHandler(KafkaConsumerRecords<String, CloudEvent> records) {
if (records == null) {
return;
}
// Put records in queues
// I assume the records are ordered per topic-partition
for (int i = 0; i < records.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@ protected MapAssert<TopicPartition, Long> assertThatOffsetCommittedWithFailures(
.when(vertxConsumer)
.commit(any(Map.class));

testExecutor.accept(createOffsetManager(Vertx.vertx(), vertxConsumer), failureFlag);
final var offsetManager = createOffsetManager(Vertx.vertx(), vertxConsumer);
testExecutor.accept(offsetManager, failureFlag);

try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(offsetManager.close().succeeded()).isTrue();

final var committed = mockConsumer.committed(Set.copyOf(partitionsConsumed));
return assertThat(
Expand Down

0 comments on commit db4f0c8

Please sign in to comment.