@@ -1968,6 +1968,54 @@ public void testAckModeCount() throws Exception {
19681968 container .stop ();
19691969 }
19701970
1971+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
1972+ @ Test
1973+ public void testCommitErrorHandlerCalled () throws Exception {
1974+ ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
1975+ Consumer <Integer , String > consumer = mock (Consumer .class );
1976+ given (cf .createConsumer (eq ("grp" ), eq ("clientId" ), isNull ())).willReturn (consumer );
1977+ final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
1978+ records .put (new TopicPartition ("foo" , 0 ), Arrays .asList (
1979+ new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" ),
1980+ new ConsumerRecord <>("foo" , 0 , 1L , 1 , "bar" )));
1981+ ConsumerRecords <Integer , String > consumerRecords = new ConsumerRecords <>(records );
1982+ ConsumerRecords <Integer , String > emptyRecords = new ConsumerRecords <>(Collections .emptyMap ());
1983+ AtomicBoolean first = new AtomicBoolean (true );
1984+ given (consumer .poll (anyLong ())).willAnswer (i -> {
1985+ Thread .sleep (50 );
1986+ return first .getAndSet (false ) ? consumerRecords : emptyRecords ;
1987+ });
1988+ willAnswer (i -> {
1989+ throw new RuntimeException ("Commit failed" );
1990+ }).given (consumer ).commitSync (any (Map .class ));
1991+ TopicPartitionInitialOffset [] topicPartition = new TopicPartitionInitialOffset [] {
1992+ new TopicPartitionInitialOffset ("foo" , 0 ) };
1993+ ContainerProperties containerProps = new ContainerProperties (topicPartition );
1994+ containerProps .setGroupId ("grp" );
1995+ containerProps .setClientId ("clientId" );
1996+ containerProps .setIdleEventInterval (100L );
1997+ containerProps .setMessageListener ((MessageListener ) r -> { });
1998+ final CountDownLatch ehl = new CountDownLatch (1 );
1999+ containerProps .setErrorHandler ((r , t ) -> {
2000+ ehl .countDown ();
2001+ });
2002+ KafkaMessageListenerContainer <Integer , String > container =
2003+ new KafkaMessageListenerContainer <>(cf , containerProps );
2004+ container .start ();
2005+ assertThat (ehl .await (10 , TimeUnit .SECONDS )).isTrue ();
2006+ container .stop ();
2007+ containerProps .setMessageListener ((BatchMessageListener ) r -> { });
2008+ final CountDownLatch behl = new CountDownLatch (1 );
2009+ containerProps .setBatchErrorHandler ((r , t ) -> {
2010+ behl .countDown ();
2011+ });
2012+ container = new KafkaMessageListenerContainer <>(cf , containerProps );
2013+ first .set (true );
2014+ container .start ();
2015+ assertThat (behl .await (10 , TimeUnit .SECONDS )).isTrue ();
2016+ container .stop ();
2017+ }
2018+
19712019 private Consumer <?, ?> spyOnConsumer (KafkaMessageListenerContainer <Integer , String > container ) {
19722020 Consumer <?, ?> consumer = spy (
19732021 KafkaTestUtils .getPropertyValue (container , "listenerConsumer.consumer" , Consumer .class ));
0 commit comments