4848import org .springframework .kafka .core .KafkaTemplate ;
4949import org .springframework .kafka .core .ProducerFactory ;
5050import org .springframework .kafka .listener .ContainerProperties ;
51+ import org .springframework .kafka .listener .DefaultErrorHandler ;
5152import org .springframework .kafka .support .KafkaHeaders ;
5253import org .springframework .kafka .test .EmbeddedKafkaBroker ;
5354import org .springframework .kafka .test .context .EmbeddedKafka ;
5657import org .springframework .stereotype .Component ;
5758import org .springframework .test .annotation .DirtiesContext ;
5859import org .springframework .test .context .junit .jupiter .SpringJUnitConfig ;
60+ import org .springframework .util .backoff .FixedBackOff ;
5961
6062/**
6163 * @author Tomaz Fernandes
64+ * @author Cenk Akin
6265 * @since 2.8.3
6366 */
6467@ SpringJUnitConfig
6568@ DirtiesContext
6669@ EmbeddedKafka (topics = { RetryTopicSameContainerFactoryIntegrationTests .FIRST_TOPIC ,
67- RetryTopicSameContainerFactoryIntegrationTests .SECOND_TOPIC }, partitions = 1 )
70+ RetryTopicSameContainerFactoryIntegrationTests .SECOND_TOPIC , RetryTopicSameContainerFactoryIntegrationTests . THIRD_TOPIC }, partitions = 1 )
6871public class RetryTopicSameContainerFactoryIntegrationTests {
6972
7073 private static final Logger logger = LoggerFactory .getLogger (RetryTopicSameContainerFactoryIntegrationTests .class );
@@ -73,6 +76,8 @@ public class RetryTopicSameContainerFactoryIntegrationTests {
7376
7477 public final static String SECOND_TOPIC = "myRetryTopic2" ;
7578
79+ public final static String THIRD_TOPIC = "myRetryTopic3" ;
80+
7681 @ Autowired
7782 private KafkaTemplate <String , String > sendKafkaTemplate ;
7883
@@ -85,10 +90,13 @@ void shouldRetryFirstAndSecondTopics(@Autowired RetryTopicComponentFactory compo
8590 sendKafkaTemplate .send (FIRST_TOPIC , "Testing topic 1" );
8691 logger .debug ("Sending message to topic " + SECOND_TOPIC );
8792 sendKafkaTemplate .send (SECOND_TOPIC , "Testing topic 2" );
88- assertThat (awaitLatch (latchContainer .countDownLatch1 )).isTrue ();
89- assertThat (awaitLatch (latchContainer .countDownLatch2 )).isTrue ();
90- assertThat (awaitLatch (latchContainer .countDownLatchDlt1 )).isTrue ();
91- assertThat (awaitLatch (latchContainer .countDownLatchDlt2 )).isTrue ();
93+ logger .debug ("Sending message to topic " + THIRD_TOPIC );
94+ sendKafkaTemplate .send (THIRD_TOPIC , "Testing topic 3" );
95+ assertThat (awaitLatch (latchContainer .countDownLatchFirstRetryable )).isTrue ();
96+ assertThat (awaitLatch (latchContainer .countDownLatchDltOne )).isTrue ();
97+ assertThat (awaitLatch (latchContainer .countDownLatchSecondRetryable )).isTrue ();
98+ assertThat (awaitLatch (latchContainer .countDownLatchDltSecond )).isTrue ();
99+ assertThat (awaitLatch (latchContainer .countDownLatchBasic )).isTrue ();
92100 assertThat (awaitLatch (latchContainer .customizerLatch )).isTrue ();
93101 verify (componentFactory ).destinationTopicResolver ();
94102 }
@@ -116,14 +124,14 @@ static class FirstRetryableKafkaListener {
116124 topicSuffixingStrategy = TopicSuffixingStrategy .SUFFIX_WITH_INDEX_VALUE )
117125 @ KafkaListener (topics = RetryTopicSameContainerFactoryIntegrationTests .FIRST_TOPIC )
118126 public void listen (String in , @ Header (KafkaHeaders .RECEIVED_TOPIC ) String topic ) {
119- countDownLatchContainer .countDownLatch1 .countDown ();
127+ countDownLatchContainer .countDownLatchFirstRetryable .countDown ();
120128 logger .warn (in + " from " + topic );
121- throw new RuntimeException ("from RetryableKafkaListener " );
129+ throw new RuntimeException ("from FirstRetryableKafkaListener " );
122130 }
123131
124132 @ DltHandler
125133 public void dlt (String in , @ Header (KafkaHeaders .RECEIVED_TOPIC ) String topic ) {
126- countDownLatchContainer .countDownLatchDlt1 .countDown ();
134+ countDownLatchContainer .countDownLatchDltOne .countDown ();
127135 logger .warn (in + " from " + topic );
128136 }
129137 }
@@ -137,28 +145,39 @@ static class SecondRetryableKafkaListener {
137145 @ RetryableTopic
138146 @ KafkaListener (topics = RetryTopicSameContainerFactoryIntegrationTests .SECOND_TOPIC )
139147 public void listen (String in , @ Header (KafkaHeaders .RECEIVED_TOPIC ) String topic ) {
140- countDownLatchContainer .countDownLatch2 .countDown ();
141- logger .warn (in + " from " + topic );
148+ countDownLatchContainer .countDownLatchSecondRetryable .countDown ();
149+ logger .info (in + " from " + topic );
142150 throw new RuntimeException ("from SecondRetryableKafkaListener" );
143151 }
144152
145-
146153 @ DltHandler
147154 public void dlt (String in , @ Header (KafkaHeaders .RECEIVED_TOPIC ) String topic ) {
148- countDownLatchContainer .countDownLatchDlt2 .countDown ();
155+ countDownLatchContainer .countDownLatchDltSecond .countDown ();
149156 logger .warn (in + " from " + topic );
150157 }
151158 }
152159
160+
161+ @ Component
162+ static class BasicKafkaListener {
163+
164+ @ KafkaListener (topics = RetryTopicSameContainerFactoryIntegrationTests .THIRD_TOPIC )
165+ public void listen (String in , @ Header (KafkaHeaders .RECEIVED_TOPIC ) String topic ) {
166+ logger .info (in + " from " + topic );
167+ throw new RuntimeException ("from BasicKafkaListener" );
168+ }
169+ }
170+
153171 @ Component
154172 static class CountDownLatchContainer {
155173
156- CountDownLatch countDownLatch1 = new CountDownLatch (4 );
157- CountDownLatch countDownLatch2 = new CountDownLatch (3 );
174+ CountDownLatch countDownLatchFirstRetryable = new CountDownLatch (4 );
175+ CountDownLatch countDownLatchSecondRetryable = new CountDownLatch (3 );
176+ CountDownLatch countDownLatchDltOne = new CountDownLatch (1 );
177+ CountDownLatch countDownLatchDltSecond = new CountDownLatch (1 );
158178
159- CountDownLatch countDownLatchDlt1 = new CountDownLatch (1 );
160- CountDownLatch countDownLatchDlt2 = new CountDownLatch (1 );
161- CountDownLatch customizerLatch = new CountDownLatch (9 );
179+ CountDownLatch countDownLatchBasic = new CountDownLatch (1 );
180+ CountDownLatch customizerLatch = new CountDownLatch (10 );
162181 }
163182
164183 @ EnableKafka
@@ -183,6 +202,11 @@ SecondRetryableKafkaListener secondRetryableKafkaListener() {
183202 return new SecondRetryableKafkaListener ();
184203 }
185204
205+ @ Bean
206+ BasicKafkaListener basicKafkaListener () {
207+ return new BasicKafkaListener ();
208+ }
209+
186210 @ Bean
187211 public ConcurrentKafkaListenerContainerFactory <String , String > kafkaListenerContainerFactory (
188212 ConsumerFactory <String , String > consumerFactory , CountDownLatchContainer latchContainer ) {
@@ -194,6 +218,10 @@ public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerCont
194218 props .setPollTimeout (50L );
195219 props .setIdlePartitionEventInterval (100L );
196220 factory .setConsumerFactory (consumerFactory );
221+ DefaultErrorHandler errorHandler = new DefaultErrorHandler (
222+ (cr , ex ) -> latchContainer .countDownLatchBasic .countDown (),
223+ new FixedBackOff (0 , 2 ));
224+ factory .setCommonErrorHandler (errorHandler );
197225 factory .setConcurrency (1 );
198226 factory .setContainerCustomizer (
199227 container -> latchContainer .customizerLatch .countDown ());
0 commit comments