@@ -151,8 +151,7 @@ void testInboundRecord(EmbeddedKafkaBroker embeddedKafka) {
151151 adapter .setRecordMessageConverter (new MessagingMessageConverter () {
152152
153153 @ Override
154- public Message <?> toMessage (ConsumerRecord <?, ?> record , Acknowledgment acknowledgment ,
155- Consumer <?, ?> consumer , Type type ) {
154+ public Message <?> toMessage (ConsumerRecord <?, ?> record , Object acknowledgment , Object consumer , Type type ) {
156155 Message <?> message = super .toMessage (record , acknowledgment , consumer , type );
157156 return MessageBuilder .fromMessage (message ).setHeader ("testHeader" , "testValue" ).build ();
158157 }
@@ -189,27 +188,26 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
189188 assertThat (received .getPayload ()).isInstanceOf (KafkaNull .class );
190189
191190 headers = received .getHeaders ();
192- assertThat (headers .get (KafkaHeaders .RECEIVED_KEY )).isEqualTo (1 );
193- assertThat (headers .get (KafkaHeaders .RECEIVED_TOPIC )).isEqualTo (topic1 );
194- assertThat (headers .get (KafkaHeaders .RECEIVED_PARTITION )).isEqualTo (0 );
195- assertThat (headers .get (KafkaHeaders .OFFSET )).isEqualTo (1L );
196- assertThat ((Long ) headers .get (KafkaHeaders .RECEIVED_TIMESTAMP )).isGreaterThan (0L );
197- assertThat (headers .get (KafkaHeaders .TIMESTAMP_TYPE )).isEqualTo ("CREATE_TIME" );
198-
199- assertThat (headers .get ("testHeader" )).isEqualTo ("testValue" );
191+ assertThat (headers )
192+ .containsEntry (KafkaHeaders .RECEIVED_KEY , 1 )
193+ .containsEntry (KafkaHeaders .RECEIVED_TOPIC , topic1 )
194+ .containsEntry (KafkaHeaders .RECEIVED_PARTITION , 0 )
195+ .containsEntry (KafkaHeaders .OFFSET , 1L )
196+ .containsEntry (KafkaHeaders .TIMESTAMP_TYPE , "CREATE_TIME" )
197+ .containsEntry ("testHeader" , "testValue" );
200198
201199 adapter .setMessageConverter (new RecordMessageConverter () {
202200
203201 @ Override
204- public Message <?> toMessage (ConsumerRecord <?, ?> record , Acknowledgment acknowledgment ,
205- Consumer <?, ?> consumer , Type type ) {
202+ public Message <?> toMessage (ConsumerRecord <?, ?> record , Object acknowledgment , Object con , Type type ) {
206203 throw new RuntimeException ("testError" );
207204 }
208205
209206 @ Override
210207 public ProducerRecord <?, ?> fromMessage (Message <?> message , String defaultTopic ) {
211208 return null ;
212209 }
210+
213211 });
214212 PollableChannel errors = new QueueChannel ();
215213 adapter .setErrorChannel (errors );
@@ -272,10 +270,12 @@ protected boolean doSend(Message<?> message, long timeout) {
272270 assertThat (originalMessage ).isNotNull ();
273271 assertThat (originalMessage .getHeaders ().get (IntegrationMessageHeaderAccessor .SOURCE_DATA )).isNull ();
274272 headers = originalMessage .getHeaders ();
275- assertThat (headers .get (KafkaHeaders .RECEIVED_KEY )).isEqualTo (1 );
276- assertThat (headers .get (KafkaHeaders .RECEIVED_TOPIC )).isEqualTo (topic4 );
277- assertThat (headers .get (KafkaHeaders .RECEIVED_PARTITION )).isEqualTo (0 );
278- assertThat (headers .get (KafkaHeaders .OFFSET )).isEqualTo (0L );
273+ assertThat (headers )
274+ .containsEntry (KafkaHeaders .RECEIVED_KEY , 1 )
275+ .containsEntry (KafkaHeaders .RECEIVED_TOPIC , topic4 )
276+ .containsEntry (KafkaHeaders .RECEIVED_PARTITION , 0 )
277+ .containsEntry (KafkaHeaders .OFFSET , 0L );
278+
279279 assertThat (StaticMessageHeaderAccessor .getDeliveryAttempt (originalMessage ).get ()).isEqualTo (3 );
280280
281281 assertThat (receivedMessageHistory .get ()).isNotNull ();
@@ -383,10 +383,11 @@ protected boolean doSend(Message<?> message, long timeout) {
383383 assertThat (originalMessage .getHeaders ().get (IntegrationMessageHeaderAccessor .SOURCE_DATA ))
384384 .isSameAs (headers .get (KafkaHeaders .RAW_DATA ));
385385 headers = originalMessage .getHeaders ();
386- assertThat (headers .get (KafkaHeaders .RECEIVED_KEY )).isEqualTo (1 );
387- assertThat (headers .get (KafkaHeaders .RECEIVED_TOPIC )).isEqualTo (topic5 );
388- assertThat (headers .get (KafkaHeaders .RECEIVED_PARTITION )).isEqualTo (0 );
389- assertThat (headers .get (KafkaHeaders .OFFSET )).isEqualTo (0L );
386+ assertThat (headers )
387+ .containsEntry (KafkaHeaders .RECEIVED_KEY , 1 )
388+ .containsEntry (KafkaHeaders .RECEIVED_TOPIC , topic5 )
389+ .containsEntry (KafkaHeaders .RECEIVED_PARTITION , 0 )
390+ .containsEntry (KafkaHeaders .OFFSET , 0L );
390391 assertThat (StaticMessageHeaderAccessor .getDeliveryAttempt (originalMessage ).get ()).isEqualTo (1 );
391392
392393 adapter .stop ();
@@ -397,7 +398,8 @@ protected boolean doSend(Message<?> message, long timeout) {
397398 void testInboundBatch (EmbeddedKafkaBroker embeddedKafka ) throws Exception {
398399 Map <String , Object > props = KafkaTestUtils .consumerProps (embeddedKafka , "test2" , true );
399400 props .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
400- props .put (ConsumerConfig .FETCH_MIN_BYTES_CONFIG , 12 );
401+ props .put (ConsumerConfig .FETCH_MIN_BYTES_CONFIG , 24 );
402+ props .put (ConsumerConfig .FETCH_MAX_WAIT_MS_CONFIG , 2000 );
401403
402404 DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(props );
403405 ContainerProperties containerProps = new ContainerProperties (topic2 );
@@ -513,14 +515,15 @@ void testInboundJson(EmbeddedKafkaBroker embeddedKafka) {
513515 assertThat (received ).isNotNull ();
514516
515517 MessageHeaders headers = received .getHeaders ();
516- assertThat (headers .get (KafkaHeaders .RECEIVED_KEY )).isEqualTo (1 );
517- assertThat (headers .get (KafkaHeaders .RECEIVED_TOPIC )).isEqualTo (topic3 );
518- assertThat (headers .get (KafkaHeaders .RECEIVED_PARTITION )).isEqualTo (0 );
519- assertThat (headers .get (KafkaHeaders .OFFSET )).isEqualTo (0L );
518+ assertThat (headers )
519+ .containsEntry (KafkaHeaders .RECEIVED_KEY , 1 )
520+ .containsEntry (KafkaHeaders .RECEIVED_TOPIC , topic3 )
521+ .containsEntry (KafkaHeaders .RECEIVED_PARTITION , 0 )
522+ .containsEntry (KafkaHeaders .OFFSET , 0L )
523+ .containsEntry (KafkaHeaders .RECEIVED_TIMESTAMP , 1487694048607L )
524+ .containsEntry (KafkaHeaders .TIMESTAMP_TYPE , "CREATE_TIME" )
525+ .containsEntry ("foo" , "bar" );
520526
521- assertThat (headers .get (KafkaHeaders .RECEIVED_TIMESTAMP )).isEqualTo (1487694048607L );
522- assertThat (headers .get (KafkaHeaders .TIMESTAMP_TYPE )).isEqualTo ("CREATE_TIME" );
523- assertThat (headers .get ("foo" )).isEqualTo ("bar" );
524527 assertThat (received .getPayload ()).isInstanceOf (Map .class );
525528
526529 adapter .stop ();
@@ -579,8 +582,8 @@ void testInboundJsonWithPayload(EmbeddedKafkaBroker embeddedKafka) {
579582 @ SuppressWarnings ({"unchecked" , "rawtypes" })
580583 @ Test
581584 void testPauseResume () throws Exception {
582- ConsumerFactory <Integer , String > cf = mock (ConsumerFactory . class );
583- Consumer <Integer , String > consumer = mock (Consumer . class );
585+ ConsumerFactory <Integer , String > cf = mock ();
586+ Consumer <Integer , String > consumer = mock ();
584587 given (cf .createConsumer (eq ("testPauseResumeGroup" ), eq ("clientId" ), isNull (), any ())).willReturn (consumer );
585588 final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
586589 records .put (new TopicPartition ("foo" , 0 ), Arrays .asList (
0 commit comments