2727import java .util .Map ;
2828import java .util .concurrent .CountDownLatch ;
2929import java .util .concurrent .TimeUnit ;
30+ import java .util .concurrent .atomic .AtomicBoolean ;
3031
3132import org .apache .kafka .clients .consumer .Consumer ;
3233import org .apache .kafka .clients .consumer .ConsumerConfig ;
3839import org .junit .runner .RunWith ;
3940import org .mockito .Mockito ;
4041
41- import org .springframework .beans .DirectFieldAccessor ;
4242import org .springframework .beans .factory .annotation .Autowired ;
4343import org .springframework .context .annotation .Bean ;
4444import org .springframework .context .annotation .Configuration ;
5757import org .springframework .kafka .listener .ConcurrentMessageListenerContainer ;
5858import org .springframework .kafka .listener .ConsumerSeekAware ;
5959import org .springframework .kafka .listener .KafkaListenerErrorHandler ;
60- import org .springframework .kafka .listener .KafkaMessageListenerContainer ;
6160import org .springframework .kafka .listener .MessageListenerContainer ;
6261import org .springframework .kafka .listener .adapter .FilteringAcknowledgingMessageListenerAdapter ;
6362import org .springframework .kafka .listener .adapter .MessagingMessageListenerAdapter ;
@@ -297,28 +296,8 @@ public void testEmpty() throws Exception {
297296 }
298297
299298 @ Test
300- @ SuppressWarnings ("unchecked" )
301299 public void testBatch () throws Exception {
302300 this .recordFilter .called = false ;
303- ConcurrentMessageListenerContainer <?, ?> container =
304- (ConcurrentMessageListenerContainer <?, ?>) registry .getListenerContainer ("list1" );
305- Consumer <?, ?> consumer =
306- spyOnConsumer ((KafkaMessageListenerContainer <Integer , String >) container .getContainers ().get (0 ));
307-
308- final CountDownLatch commitLatch = new CountDownLatch (2 );
309-
310- willAnswer (invocation -> {
311-
312- try {
313- return invocation .callRealMethod ();
314- }
315- finally {
316- commitLatch .countDown ();
317- }
318-
319- }).given (consumer )
320- .commitSync (any ());
321-
322301 template .send ("annotated14" , null , "foo" );
323302 template .send ("annotated14" , null , "bar" );
324303 assertThat (this .listener .latch10 .await (60 , TimeUnit .SECONDS )).isTrue ();
@@ -328,7 +307,7 @@ public void testBatch() throws Exception {
328307 assertThat (list .get (0 )).isInstanceOf (String .class );
329308 assertThat (this .recordFilter .called ).isTrue ();
330309
331- assertThat (commitLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
310+ assertThat (this . config . spyLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
332311 }
333312
334313 @ Test
@@ -404,19 +383,13 @@ public void testBatchMessagesAck() throws Exception {
404383 assertThat (this .listener .ack ).isNotNull ();
405384 }
406385
407- private Consumer <?, ?> spyOnConsumer (KafkaMessageListenerContainer <Integer , String > container ) {
408- Consumer <?, ?> consumer = spy (
409- KafkaTestUtils .getPropertyValue (container , "listenerConsumer.consumer" , Consumer .class ));
410- new DirectFieldAccessor (KafkaTestUtils .getPropertyValue (container , "listenerConsumer" ))
411- .setPropertyValue ("consumer" , consumer );
412- return consumer ;
413- }
414-
415386 @ Configuration
416387 @ EnableKafka
417388 @ EnableTransactionManagement (proxyTargetClass = true )
418389 public static class Config {
419390
391+ private final CountDownLatch spyLatch = new CountDownLatch (2 );
392+
420393 @ Bean
421394 public static PropertySourcesPlaceholderConfigurer ppc () {
422395 return new PropertySourcesPlaceholderConfigurer ();
@@ -466,6 +439,34 @@ public KafkaListenerContainerFactory<?> batchFactory() {
466439 return factory ;
467440 }
468441
442+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
443+ @ Bean
444+ public KafkaListenerContainerFactory <?> batchSpyFactory () {
445+ ConcurrentKafkaListenerContainerFactory <Integer , String > factory =
446+ new ConcurrentKafkaListenerContainerFactory <>();
447+ ConsumerFactory spiedCf = mock (ConsumerFactory .class );
448+ willAnswer (i -> {
449+ Consumer <Integer , String > spy =
450+ spy (consumerFactory ()
451+ .createConsumer (i .getArgumentAt (0 , String .class ), i .getArgumentAt (1 , String .class )));
452+ willAnswer (invocation -> {
453+
454+ try {
455+ return invocation .callRealMethod ();
456+ }
457+ finally {
458+ spyLatch .countDown ();
459+ }
460+
461+ }).given (spy ).commitSync (any (Map .class ));
462+ return spy ;
463+ }).given (spiedCf ).createConsumer (Mockito .anyString (), Mockito .anyString ());
464+ factory .setConsumerFactory (spiedCf );
465+ factory .setBatchListener (true );
466+ factory .setRecordFilterStrategy (recordFilter ());
467+ return factory ;
468+ }
469+
469470 @ Bean
470471 public KafkaListenerContainerFactory <?> batchManualFactory () {
471472 ConcurrentKafkaListenerContainerFactory <Integer , String > factory =
@@ -751,7 +752,9 @@ public void listen9(Object payload) {
751752 this .latch9 .countDown ();
752753 }
753754
754- @ KafkaListener (id = "list1" , topics = "annotated14" , containerFactory = "batchFactory" )
755+ private final AtomicBoolean reposition10 = new AtomicBoolean ();
756+
757+ @ KafkaListener (id = "list1" , topics = "annotated14" , containerFactory = "batchSpyFactory" )
755758 public void listen10 (List <String > list ) {
756759 this .payload = list ;
757760 this .latch10 .countDown ();
0 commit comments