@@ -551,6 +551,77 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
551551 container .stop ();
552552 }
553553
554+ @ Test
555+ public void testRecordAckMockForeignThread () throws Exception {
556+ testRecordAckMockForeignThreadGuts (AckMode .MANUAL );
557+ }
558+
559+ @ Test
560+ public void testRecordAckMockForeignThreadImmediate () throws Exception {
561+ testRecordAckMockForeignThreadGuts (AckMode .MANUAL_IMMEDIATE );
562+ }
563+
564+ @ SuppressWarnings ("unchecked" )
565+ private void testRecordAckMockForeignThreadGuts (AckMode ackMode ) throws Exception {
566+ ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
567+ Consumer <Integer , String > consumer = mock (Consumer .class );
568+ given (cf .createConsumer (isNull (), eq ("clientId" ), isNull ())).willReturn (consumer );
569+ final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
570+ records .put (new TopicPartition ("foo" , 0 ), Arrays .asList (
571+ new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" ),
572+ new ConsumerRecord <>("foo" , 0 , 1L , 1 , "bar" )));
573+ ConsumerRecords <Integer , String > consumerRecords = new ConsumerRecords <>(records );
574+ given (consumer .poll (anyLong ())).willAnswer (i -> {
575+ Thread .sleep (50 );
576+ return consumerRecords ;
577+ });
578+ TopicPartitionInitialOffset [] topicPartition = new TopicPartitionInitialOffset [] {
579+ new TopicPartitionInitialOffset ("foo" , 0 ) };
580+ ContainerProperties containerProps = new ContainerProperties (topicPartition );
581+ containerProps .setAckMode (ackMode );
582+ final CountDownLatch latch = new CountDownLatch (2 );
583+ final List <Acknowledgment > acks = new ArrayList <>();
584+ final AtomicReference <Thread > consumerThread = new AtomicReference <>();
585+ AcknowledgingMessageListener <Integer , String > messageListener = spy (
586+ new AcknowledgingMessageListener <Integer , String >() {
587+
588+ @ Override
589+ public void onMessage (ConsumerRecord <Integer , String > data , Acknowledgment acknowledgment ) {
590+ acks .add (acknowledgment );
591+ consumerThread .set (Thread .currentThread ());
592+ latch .countDown ();
593+ if (latch .getCount () == 0 ) {
594+ records .clear ();
595+ }
596+ }
597+
598+ });
599+
600+ final CountDownLatch commitLatch = new CountDownLatch (1 );
601+ final AtomicReference <Thread > commitThread = new AtomicReference <>();
602+ willAnswer (i -> {
603+ commitThread .set (Thread .currentThread ());
604+ commitLatch .countDown ();
605+ return null ;
606+ }
607+ ).given (consumer ).commitSync (any (Map .class ));
608+
609+ containerProps .setMessageListener (messageListener );
610+ containerProps .setClientId ("clientId" );
611+ KafkaMessageListenerContainer <Integer , String > container =
612+ new KafkaMessageListenerContainer <>(cf , containerProps );
613+ container .start ();
614+ assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
615+ acks .get (1 ).acknowledge ();
616+ assertThat (commitLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
617+ InOrder inOrder = inOrder (messageListener , consumer );
618+ inOrder .verify (consumer ).poll (1000 );
619+ inOrder .verify (messageListener , times (2 )).onMessage (any (ConsumerRecord .class ), any (Acknowledgment .class ));
620+ inOrder .verify (consumer ).commitSync (any (Map .class ));
621+ container .stop ();
622+ assertThat (commitThread .get ()).isSameAs (consumerThread .get ());
623+ }
624+
554625 @ SuppressWarnings ("unchecked" )
555626 @ Test
556627 public void testNonResponsiveConsumerEvent () throws Exception {
@@ -1576,7 +1647,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
15761647 final CountDownLatch commitLatch = new CountDownLatch (2 );
15771648 willAnswer (invocation -> {
15781649
1579- @ SuppressWarnings ({ "unchecked" })
15801650 Map <TopicPartition , OffsetAndMetadata > map = invocation .getArgument (0 );
15811651 try {
15821652 return invocation .callRealMethod ();
0 commit comments