Skip to content

Commit 43fd291

Browse files
garyrussellartembilan
authored andcommitted
GH-916: Fix RemainingRecordsErrorHandler
Fixes #916 The container failed to invoke the proper method for an implementation of `RemainingRecordsErrorHandler`. Regression caused by the addition of the `ContainerStoppingErrorHandler`. **cherry-pick to 2.1.x** There will be conflicts in the container, but the changes are simply to replace `ContainerAwareErrorHandler` with `RemainingRecordsErrorHandler`. * Fix race in test.
1 parent cf6a785 commit 43fd291

File tree

2 files changed

+211
-3
lines changed

2 files changed

+211
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 the original author or authors.
2+
* Copyright 2016-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1228,7 +1228,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
12281228
@SuppressWarnings(RAWTYPES) @Nullable Producer producer,
12291229
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException e) {
12301230

1231-
if (this.errorHandler instanceof ContainerAwareErrorHandler) {
1231+
if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
12321232
if (producer == null) {
12331233
processCommits();
12341234
}
@@ -1237,7 +1237,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
12371237
while (iterator.hasNext()) {
12381238
records.add(iterator.next());
12391239
}
1240-
((ContainerAwareErrorHandler) this.errorHandler).handle(e, records, this.consumer,
1240+
((RemainingRecordsErrorHandler) this.errorHandler).handle(e, records, this.consumer,
12411241
KafkaMessageListenerContainer.this.container);
12421242
}
12431243
else {
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.BDDMockito.given;
22+
import static org.mockito.BDDMockito.willAnswer;
23+
import static org.mockito.Mockito.inOrder;
24+
import static org.mockito.Mockito.mock;
25+
26+
import java.time.Duration;
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Collection;
30+
import java.util.Collections;
31+
import java.util.LinkedHashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicInteger;
37+
import java.util.stream.Collectors;
38+
39+
import org.apache.kafka.clients.consumer.Consumer;
40+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
41+
import org.apache.kafka.clients.consumer.ConsumerRecord;
42+
import org.apache.kafka.clients.consumer.ConsumerRecords;
43+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
44+
import org.apache.kafka.common.TopicPartition;
45+
import org.apache.kafka.common.record.TimestampType;
46+
import org.junit.Test;
47+
import org.junit.runner.RunWith;
48+
import org.mockito.InOrder;
49+
50+
import org.springframework.beans.factory.annotation.Autowired;
51+
import org.springframework.context.annotation.Bean;
52+
import org.springframework.context.annotation.Configuration;
53+
import org.springframework.kafka.annotation.EnableKafka;
54+
import org.springframework.kafka.annotation.KafkaListener;
55+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
56+
import org.springframework.kafka.core.ConsumerFactory;
57+
import org.springframework.test.annotation.DirtiesContext;
58+
import org.springframework.test.context.junit4.SpringRunner;
59+
60+
/**
61+
* @author Gary Russell
62+
* @since 2.1.12
63+
*
64+
*/
65+
@RunWith(SpringRunner.class)
66+
@DirtiesContext
67+
public class RemainingRecordsErrorHandlerTests {
68+
69+
private static final String CONTAINER_ID = "container";
70+
71+
@SuppressWarnings("rawtypes")
72+
@Autowired
73+
private Consumer consumer;
74+
75+
@Autowired
76+
private Config config;
77+
78+
/*
79+
* Deliver 6 records from three partitions, fail on the second record second
80+
* partition.
81+
*/
82+
@SuppressWarnings("unchecked")
83+
@Test
84+
public void remaingRecordsReceived() throws Exception {
85+
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
86+
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
87+
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
88+
assertThat(this.config.errorLatch.await(10, TimeUnit.SECONDS)).isTrue();
89+
InOrder inOrder = inOrder(this.consumer);
90+
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
91+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
92+
inOrder.verify(this.consumer).commitSync(
93+
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)));
94+
inOrder.verify(this.consumer).commitSync(
95+
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)));
96+
inOrder.verify(this.consumer).commitSync(
97+
Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)));
98+
assertThat(this.config.count).isEqualTo(4);
99+
assertThat(this.config.contents).containsExactly("foo", "bar", "baz", "qux");
100+
assertThat(this.config.remaining).containsExactly("qux", "fiz", "buz");
101+
}
102+
103+
@Configuration
104+
@EnableKafka
105+
public static class Config {
106+
107+
private final List<String> contents = new ArrayList<>();
108+
109+
private final CountDownLatch pollLatch = new CountDownLatch(1);
110+
111+
private final CountDownLatch deliveryLatch = new CountDownLatch(3);
112+
113+
private final CountDownLatch errorLatch = new CountDownLatch(1);
114+
115+
private final CountDownLatch commitLatch = new CountDownLatch(3);
116+
117+
private final List<String> remaining = new ArrayList<>();
118+
119+
private int count;
120+
121+
@KafkaListener(id = CONTAINER_ID, topics = "foo")
122+
public void foo(String in) {
123+
this.contents.add(in);
124+
this.deliveryLatch.countDown();
125+
if (++this.count == 4) { // part 1, offset 1, first time
126+
throw new RuntimeException("foo");
127+
}
128+
}
129+
130+
@SuppressWarnings({ "rawtypes" })
131+
@Bean
132+
public ConsumerFactory consumerFactory() {
133+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
134+
final Consumer consumer = consumer();
135+
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0")).willReturn(consumer);
136+
return consumerFactory;
137+
}
138+
139+
@SuppressWarnings({ "rawtypes", "unchecked" })
140+
@Bean
141+
public Consumer consumer() {
142+
final Consumer consumer = mock(Consumer.class);
143+
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
144+
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
145+
final TopicPartition topicPartition2 = new TopicPartition("foo", 2);
146+
willAnswer(i -> {
147+
((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned(
148+
Collections.singletonList(topicPartition1));
149+
return null;
150+
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
151+
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
152+
records1.put(topicPartition0, Arrays.asList(
153+
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "foo"),
154+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "bar")));
155+
records1.put(topicPartition1, Arrays.asList(
156+
new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "baz"),
157+
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "qux")));
158+
records1.put(topicPartition2, Arrays.asList(
159+
new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "fiz"),
160+
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "buz")));
161+
final AtomicInteger which = new AtomicInteger();
162+
willAnswer(i -> {
163+
this.pollLatch.countDown();
164+
switch (which.getAndIncrement()) {
165+
case 0:
166+
return new ConsumerRecords(records1);
167+
default:
168+
try {
169+
Thread.sleep(1000);
170+
}
171+
catch (InterruptedException e) {
172+
Thread.currentThread().interrupt();
173+
}
174+
return new ConsumerRecords(Collections.emptyMap());
175+
}
176+
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
177+
willAnswer(i -> {
178+
this.commitLatch.countDown();
179+
return null;
180+
}).given(consumer).commitSync(any(Map.class));
181+
return consumer;
182+
}
183+
184+
@SuppressWarnings({ "rawtypes", "unchecked" })
185+
@Bean
186+
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
187+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
188+
factory.setConsumerFactory(consumerFactory());
189+
factory.getContainerProperties().setAckOnError(false);
190+
factory.setErrorHandler(new RemainingRecordsErrorHandler() {
191+
192+
@Override
193+
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
194+
Consumer<?, ?> consumer) {
195+
remaining.addAll(records.stream()
196+
.map(r -> (String) r.value())
197+
.collect(Collectors.toList()));
198+
errorLatch.countDown();
199+
}
200+
201+
});
202+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
203+
return factory;
204+
}
205+
206+
}
207+
208+
}

0 commit comments

Comments
 (0)