@@ -132,11 +132,13 @@ public class KafkaMessageListenerContainerTests {
132132
133133 private static String topic18 = "testTopic18" ;
134134
135+ private static String topic19 = "testTopic19" ;
136+
135137
136138 @ ClassRule
137139 public static KafkaEmbedded embeddedKafka = new KafkaEmbedded (1 , true , topic1 , topic2 , topic3 , topic4 , topic5 ,
138140 topic6 , topic7 , topic8 , topic9 , topic10 , topic11 , topic12 , topic13 , topic14 , topic15 , topic16 , topic17 ,
139- topic18 );
141+ topic18 , topic19 );
140142
141143 @ Rule
142144 public TestName testName = new TestName ();
@@ -1675,6 +1677,68 @@ public void testPauseResume() throws Exception {
16751677 container .stop ();
16761678 }
16771679
1680+ @ Test
1681+ public void testExceptionWhenCommitAfterRebalance () throws Exception {
1682+ final CountDownLatch rebalanceLatch = new CountDownLatch (2 );
1683+ final CountDownLatch consumeLatch = new CountDownLatch (7 );
1684+
1685+ Map <String , Object > props = KafkaTestUtils .consumerProps ("test19" , "false" , embeddedKafka );
1686+ props .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "latest" );
1687+ props .put (ConsumerConfig .MAX_POLL_INTERVAL_MS_CONFIG , 15000 );
1688+ DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(props );
1689+ ContainerProperties containerProps = new ContainerProperties (topic19 );
1690+ containerProps .setMessageListener ((MessageListener <Integer , String >) messages -> {
1691+ logger .info ("listener: " + messages );
1692+ consumeLatch .countDown ();
1693+ try {
1694+ Thread .sleep (3000 );
1695+ } catch (InterruptedException e ) {
1696+ e .printStackTrace ();
1697+ }
1698+ });
1699+ containerProps .setSyncCommits (true );
1700+ containerProps .setAckMode (AckMode .BATCH );
1701+ containerProps .setPollTimeout (100 );
1702+ containerProps .setAckOnError (false );
1703+ containerProps .setErrorHandler (new SeekToCurrentErrorHandler ());
1704+
1705+ Map <String , Object > senderProps = KafkaTestUtils .producerProps (embeddedKafka );
1706+ ProducerFactory <Integer , String > pf = new DefaultKafkaProducerFactory <>(senderProps );
1707+ KafkaTemplate <Integer , String > template = new KafkaTemplate <>(pf );
1708+ template .setDefaultTopic (topic19 );
1709+
1710+ containerProps .setConsumerRebalanceListener (new ConsumerRebalanceListener () {
1711+
1712+ @ Override
1713+ public void onPartitionsRevoked (Collection <TopicPartition > partitions ) {
1714+ }
1715+
1716+ @ Override
1717+ public void onPartitionsAssigned (Collection <TopicPartition > partitions ) {
1718+ logger .info ("rebalance occurred." );
1719+ rebalanceLatch .countDown ();
1720+ }
1721+ });
1722+
1723+ KafkaMessageListenerContainer <Integer , String > container =
1724+ new KafkaMessageListenerContainer <>(cf , containerProps );
1725+ container .setBeanName ("testContainerException" );
1726+ container .start ();
1727+ ContainerTestUtils .waitForAssignment (container , embeddedKafka .getPartitionsPerTopic ());
1728+ container .pause ();
1729+
1730+ for (int i = 0 ; i < 6 ; i ++) {
1731+ template .sendDefault (0 , 0 , "a" );
1732+ }
1733+ template .flush ();
1734+
1735+ container .resume ();
1736+ // should be rebalanced and consume again
1737+ assertThat (rebalanceLatch .await (60 , TimeUnit .SECONDS )).isTrue ();
1738+ assertThat (consumeLatch .await (60 , TimeUnit .SECONDS )).isTrue ();
1739+ container .stop ();
1740+ }
1741+
16781742 private Consumer <?, ?> spyOnConsumer (KafkaMessageListenerContainer <Integer , String > container ) {
16791743 Consumer <?, ?> consumer = spy (
16801744 KafkaTestUtils .getPropertyValue (container , "listenerConsumer.consumer" , Consumer .class ));
0 commit comments