11/*
2- * Copyright 2023 the original author or authors.
2+ * Copyright 2023-2024 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1717package org .springframework .kafka .listener ;
1818
1919import static org .assertj .core .api .Assertions .assertThat ;
20+ import static org .assertj .core .api .Assertions .assertThatExceptionOfType ;
2021import static org .assertj .core .api .Assertions .assertThatIllegalStateException ;
2122import static org .mockito .ArgumentMatchers .any ;
23+ import static org .mockito .ArgumentMatchers .anyMap ;
24+ import static org .mockito .BDDMockito .willReturn ;
2225import static org .mockito .BDDMockito .willThrow ;
2326import static org .mockito .Mockito .mock ;
2427import static org .mockito .Mockito .spy ;
3437import org .apache .kafka .clients .consumer .ConsumerRecord ;
3538import org .apache .kafka .clients .consumer .ConsumerRecords ;
3639import org .apache .kafka .common .TopicPartition ;
40+ import org .apache .kafka .common .errors .RebalanceInProgressException ;
3741import org .junit .jupiter .api .Test ;
3842import org .mockito .ArgumentCaptor ;
3943
4044import org .springframework .core .log .LogAccessor ;
4145import org .springframework .data .util .DirectFieldAccessFallbackBeanWrapper ;
46+ import org .springframework .kafka .KafkaException ;
4247import org .springframework .util .backoff .BackOff ;
4348import org .springframework .util .backoff .FixedBackOff ;
4449
@@ -52,15 +57,6 @@ public class FailedBatchProcessorTests {
5257 @ SuppressWarnings ({ "rawtypes" , "unchecked" })
5358 @ Test
5459 void indexOutOfBounds () {
55- class TestFBP extends FailedBatchProcessor {
56-
57- TestFBP (BiConsumer <ConsumerRecord <?, ?>, Exception > recoverer , BackOff backOff ,
58- CommonErrorHandler fallbackHandler ) {
59-
60- super (recoverer , backOff , fallbackHandler );
61- }
62-
63- }
6460 CommonErrorHandler mockEH = mock (CommonErrorHandler .class );
6561 willThrow (new IllegalStateException ("fallback" )).given (mockEH ).handleBatch (any (), any (), any (), any (), any ());
6662
@@ -83,15 +79,6 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab
8379 @ SuppressWarnings ({ "rawtypes" , "unchecked" })
8480 @ Test
8581 void recordNotPresent () {
86- class TestFBP extends FailedBatchProcessor {
87-
88- TestFBP (BiConsumer <ConsumerRecord <?, ?>, Exception > recoverer , BackOff backOff ,
89- CommonErrorHandler fallbackHandler ) {
90-
91- super (recoverer , backOff , fallbackHandler );
92- }
93-
94- }
9582 CommonErrorHandler mockEH = mock (CommonErrorHandler .class );
9683 willThrow (new IllegalStateException ("fallback" )).given (mockEH ).handleBatch (any (), any (), any (), any (), any ());
9784
@@ -114,4 +101,34 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab
114101 assertThat (output ).contains ("Record not found in batch: topic-42@123;" );
115102 }
116103
104+ @ Test
105+ void testExceptionDuringCommit () {
106+ CommonErrorHandler mockEH = mock (CommonErrorHandler .class );
107+ willThrow (new IllegalStateException ("ise" )).given (mockEH ).handleBatch (any (), any (), any (), any (), any ());
108+
109+ ConsumerRecord rec1 = new ConsumerRecord ("topic" , 0 , 0L , null , null );
110+ ConsumerRecord rec2 = new ConsumerRecord ("topic" , 0 , 1L , null , null );
111+ ConsumerRecord rec3 = new ConsumerRecord ("topic" , 0 , 2L , null , null );
112+
113+ ConsumerRecords records = new ConsumerRecords (Map .of (new TopicPartition ("topic" , 0 ), List .of (rec1 , rec2 , rec3 )));
114+ TestFBP testFBP = new TestFBP ((rec , ex ) -> { }, new FixedBackOff (2L , 2L ), mockEH );
115+ final Consumer consumer = mock (Consumer .class );
116+ willThrow (new RebalanceInProgressException ("rebalance in progress" )).given (consumer ).commitSync (anyMap (), any ());
117+ final MessageListenerContainer mockMLC = mock (MessageListenerContainer .class );
118+ willReturn (new ContainerProperties ("topic" )).given (mockMLC ).getContainerProperties ();
119+ assertThatExceptionOfType (KafkaException .class ).isThrownBy (() ->
120+ testFBP .handle (new BatchListenerFailedException ("topic" , rec2 ),
121+ records , consumer , mockMLC , mock (Runnable .class ))
122+ ).withMessage ("Seek to current after exception" );
123+ }
124+
125+ static class TestFBP extends FailedBatchProcessor {
126+
127+ TestFBP (BiConsumer <ConsumerRecord <?, ?>, Exception > recoverer , BackOff backOff ,
128+ CommonErrorHandler fallbackHandler ) {
129+
130+ super (recoverer , backOff , fallbackHandler );
131+ }
132+
133+ }
117134}
0 commit comments