Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ public enum AckMode {

/**
* Whether or not to call consumer.commitSync() or commitAsync() when the
* container is responsible for commits. Default true. See
* https://github.com/spring-projects/spring-kafka/issues/62 At the time of
* writing, async commits are not entirely reliable.
* container is responsible for commits. Default true.
*/
private boolean syncCommits = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1741,26 +1741,6 @@ else if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommi

}

private static final class LoggingCommitCallback implements OffsetCommitCallback {

private static final Log logger = LogFactory.getLog(LoggingCommitCallback.class); // NOSONAR

LoggingCommitCallback() {
super();
}

@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
logger.error("Commit failed for " + offsets, exception);
}
else if (logger.isDebugEnabled()) {
logger.debug("Commits for " + offsets + " completed");
}
}

}

private static final class OffsetMetadata {

private final Long offset;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;

/**
* Logs commit results at DEBUG level for success and ERROR for failures.
*
* @author Gary Russell
* @since 2.2.4
*/
public final class LoggingCommitCallback implements OffsetCommitCallback {

private static final Log logger = LogFactory.getLog(LoggingCommitCallback.class); // NOSONAR

@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
logger.error("Commit failed for " + offsets, exception);
}
else if (logger.isDebugEnabled()) {
logger.debug("Commits for " + offsets + " completed");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@

package org.springframework.kafka.listener;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;

import org.springframework.kafka.KafkaException;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.SeekUtils;
import org.springframework.lang.Nullable;

Expand All @@ -39,10 +45,14 @@
*/
public class SeekToCurrentErrorHandler implements ContainerAwareErrorHandler {

private static final Log logger = LogFactory.getLog(SeekToCurrentErrorHandler.class); // NOSONAR
protected static final Log LOGGER = LogFactory.getLog(SeekToCurrentErrorHandler.class); // NOSONAR visibility

private static final LoggingCommitCallback LOGGING_COMMIT_CALLBACK = new LoggingCommitCallback();

private final FailedRecordTracker failureTracker;

private boolean commitRecovered;

/**
* Construct an instance with the default recoverer which simply logs the record after
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
Expand Down Expand Up @@ -82,16 +92,59 @@ public SeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> rec
* @since 2.2
*/
public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, int maxFailures) {
this.failureTracker = new FailedRecordTracker(recoverer, maxFailures, logger);
this.failureTracker = new FailedRecordTracker(recoverer, maxFailures, LOGGER);
}

/**
* Whether the offset for a recovered record should be committed.
* @return true to commit recovered record offsets.
* @since 2.2.4
*/
protected boolean isCommitRecovered() {
return this.commitRecovered;
}

/**
* Set to true to commit the offset for a recovered record. The container
* must be configured with {@link AckMode#MANUAL_IMMEDIATE}. Whether or not
* the commit is sync or async depends on the container's syncCommits
* property.
* @param commitRecovered true to commit.
* @since 2.2.4
* @see #setOffsetCommitCallback(OffsetCommitCallback)
*/
public void setCommitRecovered(boolean commitRecovered) {
this.commitRecovered = commitRecovered;
}

@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container) {

if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, logger)) {
if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, LOGGER)) {
throw new KafkaException("Seek to current after exception", thrownException);
}
else if (this.commitRecovered) {
if (container.getContainerProperties().getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) {
ConsumerRecord<?, ?> record = records.get(0);
Map<TopicPartition, OffsetAndMetadata> offsetToCommit = Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
if (container.getContainerProperties().isSyncCommits()) {
consumer.commitSync(offsetToCommit);
}
else {
OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
if (commitCallback == null) {
commitCallback = LOGGING_COMMIT_CALLBACK;
}
consumer.commitAsync(offsetToCommit, commitCallback);
}
}
else {
LOGGER.warn("'commitRecovered' ignored, container AckMode must be MANUAL_IMMEDIATE");
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand All @@ -28,6 +29,7 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
Expand All @@ -38,6 +40,8 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.junit.ClassRule;
Expand All @@ -48,6 +52,7 @@
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.event.ConsumerStoppedEvent;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
Expand Down Expand Up @@ -160,6 +165,58 @@ public void seekToCurrentErrorHandlerRecovers() {
verify(recoverer).accept(eq(records.get(0)), any());
}

@Test
public void seekToCurrentErrorHandlerRecoversManualAcksAsync() {
seekToCurrentErrorHandlerRecoversManualAcks(false);
}

@Test
public void seekToCurrentErrorHandlerRecoversManualAcksSync() {
seekToCurrentErrorHandlerRecoversManualAcks(true);
}

private void seekToCurrentErrorHandlerRecoversManualAcks(boolean syncCommits) {
@SuppressWarnings("unchecked")
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = mock(BiConsumer.class);
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, 2);
eh.setCommitRecovered(true);
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo"));
records.add(new ConsumerRecord<>("foo", 1, 0, null, "bar"));
Consumer<?, ?> consumer = mock(Consumer.class);
MessageListenerContainer container = mock(MessageListenerContainer.class);
ContainerProperties properties = new ContainerProperties("foo");
properties.setAckMode(AckMode.MANUAL_IMMEDIATE);
properties.setSyncCommits(syncCommits);
OffsetCommitCallback commitCallback = (offsets, ex) -> { };
properties.setCommitCallback(commitCallback);
given(container.getContainerProperties()).willReturn(properties);
try {
eh.handle(new RuntimeException(), records, consumer, container);
fail("Expected exception");
}
catch (KafkaException e) {
// NOSONAR
}
verify(consumer).seek(new TopicPartition("foo", 0), 0L);
verify(consumer).seek(new TopicPartition("foo", 1), 0L);
verifyNoMoreInteractions(consumer);
eh.handle(new RuntimeException(), records, consumer, container);
verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 0L);
if (syncCommits) {
verify(consumer)
.commitSync(Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)));
}
else {
verify(consumer)
.commitAsync(
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)),
commitCallback);
}
verifyNoMoreInteractions(consumer);
verify(recoverer).accept(eq(records.get(0)), any());
}

@Test
public void testNeverRecover() {
@SuppressWarnings("unchecked")
Expand Down
2 changes: 2 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2770,6 +2770,8 @@ SeekToCurrentErrorHandler errorHandler =
----
====

Starting with version 2.2.4, when the container is configured with `AckMode.MANUAL_IMMEDIATE`, the error handler can be configured to commit the offset of recovered records; set the `commitRecovered` property to `true`.

See also <<dead-letters>>.

When using transactions, similar functionality is provided by the `DefaultAfterRollbackProcessor`.
Expand Down
3 changes: 3 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ See <<after-rollback>>, <<seek-to-current>>, and <<dead-letters>> for more infor
The `ConsumerStoppingEvent` has been added.
See <<events>> for more information.

The `SeekToCurrentErrorHandler` can now be configured to commit the offset of a recovered record when the container is configured with `AckMode.MANUAL_IMMEDIATE` (since 2.2.4).
See <<seek-to-current>> for more information.

==== @KafkaListener Changes

You can now override the `concurrency` and `autoStartup` properties of the listener container factory by setting properties on the annotation.
Expand Down