Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit offset on consumer close #1463

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -61,7 +62,9 @@ static AutoCloseable toAutoCloseable(AsyncCloseable closeable) {
static AsyncCloseable compose(AsyncCloseable... closeables) {
return () -> CompositeFuture.all(
Arrays.stream(closeables)
.filter(Objects::nonNull)
.map(AsyncCloseable::close)
.filter(Objects::nonNull)
pierDipi marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toList())
).mapEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
*/
package dev.knative.eventing.kafka.broker.dispatcher;

import io.vertx.core.Future;
import dev.knative.eventing.kafka.broker.core.AsyncCloseable;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;

/**
* This class contains hooks for listening events through the {@link dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher} lifecycle.
*/
public interface RecordDispatcherListener {
public interface RecordDispatcherListener extends AsyncCloseable {

/**
* The given record has been received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public RecordDispatcherImpl(
this.subscriberSender = composeSenderAndSinkHandler(subscriberSender, responseHandler, "subscriber");
this.dlsSender = composeSenderAndSinkHandler(deadLetterSinkSender, responseHandler, "dead letter sink");
this.recordDispatcherListener = recordDispatcherListener;
this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender);
this.closeable = AsyncCloseable.compose(responseHandler, deadLetterSinkSender, subscriberSender, recordDispatcherListener);
this.consumerTracer = consumerTracer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@ public void start(Promise<Void> startPromise) {
public void stop(Promise<Void> stopPromise) {
logger.info("Stopping consumer");

AsyncCloseable.compose(
this.consumer::close,
this.recordDispatcher,
this.closeable
).close(stopPromise);
final Promise<Void> dependenciesClosedPromise = Promise.promise();

// Close consumer after other objects have been closed.
dependenciesClosedPromise.future()
.onComplete(r -> this.consumer.close().onComplete(stopPromise));

AsyncCloseable
.compose(this.recordDispatcher, this.closeable)
.close(dependenciesClosedPromise);
}

public void setConsumer(KafkaConsumer<Object, CloudEvent> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
Expand All @@ -30,6 +32,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* This class implements the offset strategy that makes sure that, even unordered, the offset commit is ordered.
Expand Down Expand Up @@ -60,7 +63,7 @@ public OffsetManager(final Vertx vertx,
this.offsetTrackers = new HashMap<>();
this.onCommit = onCommit;

vertx.setPeriodic(commitIntervalMs, l -> this.offsetTrackers.forEach(this::commit));
vertx.setPeriodic(commitIntervalMs, l -> commitAll());
}

/**
Expand Down Expand Up @@ -115,7 +118,7 @@ private void commit(final KafkaConsumerRecord<?, ?> record) {
.recordNewOffset(record.offset());
}

private synchronized void commit(final TopicPartition topicPartition, final OffsetTracker tracker) {
private synchronized Future<Void> commit(final TopicPartition topicPartition, final OffsetTracker tracker) {
long newOffset = tracker.offsetToCommit();
if (newOffset > tracker.getCommitted()) {
// Reset the state
Expand All @@ -124,7 +127,7 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs
logger.debug("Committing offset for {} offset {}", topicPartition, newOffset);

// Execute the actual commit
consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, "")))
return consumer.commit(Map.of(topicPartition, new OffsetAndMetadata(newOffset, "")))
.onSuccess(ignored -> {
if (onCommit != null) {
onCommit.accept((int) newOffset);
Expand All @@ -133,6 +136,22 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs
.onFailure(cause -> logger.error("failed to commit topic partition {} offset {}", topicPartition, newOffset, cause))
.mapEmpty();
}
return null;
}

private Future<Void> commitAll() {
pierDipi marked this conversation as resolved.
Show resolved Hide resolved
return CompositeFuture.all(
this.offsetTrackers.entrySet()
.stream()
.map(e -> commit(e.getKey(), e.getValue()))
.filter(Objects::nonNull)
.collect(Collectors.toList())
).mapEmpty();
}

@Override
public Future<Void> close() {
pierDipi marked this conversation as resolved.
Show resolved Hide resolved
return commitAll();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public void stop(Promise<Void> stopPromise) {
}

void recordsHandler(KafkaConsumerRecords<Object, CloudEvent> records) {
if (records == null) {
return;
}
// Put records in queues
// I assume the records are ordered per topic-partition
for (int i = 0; i < records.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@ protected MapAssert<TopicPartition, Long> assertThatOffsetCommittedWithFailures(
.when(vertxConsumer)
.commit(any(Map.class));

testExecutor.accept(createOffsetManager(Vertx.vertx(), vertxConsumer), failureFlag);
final var offsetManager = createOffsetManager(Vertx.vertx(), vertxConsumer);
testExecutor.accept(offsetManager, failureFlag);

try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(offsetManager.close().succeeded()).isTrue();

final var committed = mockConsumer.committed(Set.copyOf(partitionsConsumed));
return assertThat(
Expand Down