2424import static org .mockito .BDDMockito .given ;
2525import static org .mockito .BDDMockito .then ;
2626import static org .mockito .BDDMockito .willAnswer ;
27+ import static org .mockito .BDDMockito .willCallRealMethod ;
2728import static org .mockito .BDDMockito .willReturn ;
2829import static org .mockito .Mockito .atLeastOnce ;
2930import static org .mockito .Mockito .mock ;
@@ -315,7 +316,7 @@ private byte[] header(boolean isKey, DeserializationException deserEx) {
315316 return baos .toByteArray ();
316317 }
317318
318- @ SuppressWarnings ({"unchecked" , "rawtypes" })
319+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
319320 @ Test
320321 void allOriginalHeaders () {
321322 KafkaOperations <?, ?> template = mock (KafkaOperations .class );
@@ -335,7 +336,7 @@ void allOriginalHeaders() {
335336 assertThat (headers .lastHeader (KafkaHeaders .DLT_ORIGINAL_TIMESTAMP_TYPE )).isNotNull ();
336337 }
337338
338- @ SuppressWarnings ({"unchecked" , "rawtypes" })
339+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
339340 @ Test
340341 void dontAppendOriginalHeaders () {
341342 KafkaOperations <?, ?> template = mock (KafkaOperations .class );
@@ -386,7 +387,7 @@ void dontAppendOriginalHeaders() {
386387 assertThat (exceptionHeaders .hasNext ()).isFalse ();
387388 }
388389
389- @ SuppressWarnings ({"unchecked" , "rawtypes" })
390+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
390391 @ Test
391392 void appendOriginalHeaders () {
392393 KafkaOperations <?, ?> template = mock (KafkaOperations .class );
@@ -825,13 +826,13 @@ void addHeaderFunctionsProcessedInOrder() {
825826 ConsumerRecord <String , String > record = new ConsumerRecord <>("foo" , 0 , 0L , "bar" , null );
826827 DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer (template );
827828 recoverer .setHeadersFunction ((rec , ex ) -> {
828- return new RecordHeaders (new RecordHeader [] { new RecordHeader ("foo" , "one" .getBytes ()) });
829+ return new RecordHeaders (new RecordHeader []{ new RecordHeader ("foo" , "one" .getBytes ()) });
829830 });
830831 recoverer .addHeadersFunction ((rec , ex ) -> {
831- return new RecordHeaders (new RecordHeader [] { new RecordHeader ("bar" , "two" .getBytes ()) });
832+ return new RecordHeaders (new RecordHeader []{ new RecordHeader ("bar" , "two" .getBytes ()) });
832833 });
833834 recoverer .addHeadersFunction ((rec , ex ) -> {
834- return new RecordHeaders (new RecordHeader [] { new RecordHeader ("foo" , "three" .getBytes ()) });
835+ return new RecordHeaders (new RecordHeader []{ new RecordHeader ("foo" , "three" .getBytes ()) });
835836 });
836837 recoverer .accept (record , new ListenerExecutionFailedException ("test" , "group" , new RuntimeException ()));
837838 ArgumentCaptor <ProducerRecord > producerRecordCaptor = ArgumentCaptor .forClass (ProducerRecord .class );
@@ -862,12 +863,12 @@ void immutableHeaders() {
862863 ConsumerRecord <String , String > record = new ConsumerRecord <>("foo" , 0 , 0L , "bar" , null );
863864 DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer (template );
864865 recoverer .setHeadersFunction ((rec , ex ) -> {
865- RecordHeaders headers = new RecordHeaders (new RecordHeader [] { new RecordHeader ("foo" , "one" .getBytes ()) });
866+ RecordHeaders headers = new RecordHeaders (new RecordHeader []{ new RecordHeader ("foo" , "one" .getBytes ()) });
866867 headers .setReadOnly ();
867868 return headers ;
868869 });
869870 recoverer .addHeadersFunction ((rec , ex ) -> {
870- return new RecordHeaders (new RecordHeader [] { new RecordHeader ("bar" , "two" .getBytes ()) });
871+ return new RecordHeaders (new RecordHeader []{ new RecordHeader ("bar" , "two" .getBytes ()) });
871872 });
872873 recoverer .accept (record , new ListenerExecutionFailedException ("test" , "group" , new RuntimeException ()));
873874 ArgumentCaptor <ProducerRecord > producerRecordCaptor = ArgumentCaptor .forClass (ProducerRecord .class );
@@ -877,4 +878,25 @@ void immutableHeaders() {
877878 assertThat (KafkaTestUtils .getPropertyValue (headers , "headers" , List .class )).hasSize (12 );
878879 }
879880
881+ @ SuppressWarnings ("unchecked" )
882+ @ Test
883+ void nonCompliantProducerFactory () throws Exception {
884+ KafkaOperations <?, ?> template = mock (KafkaOperations .class );
885+ ProducerFactory pf = mock (ProducerFactory .class );
886+
887+ willCallRealMethod ().given (pf ).getConfigurationProperties ();
888+
889+ given (template .getProducerFactory ()).willReturn (pf );
890+ ListenableFuture <?> future = mock (ListenableFuture .class );
891+ ArgumentCaptor <Long > timeoutCaptor = ArgumentCaptor .forClass (Long .class );
892+ given (template .send (any (ProducerRecord .class ))).willReturn (future );
893+ given (future .get (timeoutCaptor .capture (), eq (TimeUnit .MILLISECONDS ))).willThrow (new TimeoutException ());
894+ ConsumerRecord <String , String > record = new ConsumerRecord <>("foo" , 0 , 0L , "bar" , null );
895+ DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer (template );
896+ recoverer .setFailIfSendResultIsError (true );
897+ assertThatThrownBy (() -> recoverer .accept (record , new RuntimeException ()))
898+ .isExactlyInstanceOf (KafkaException .class );
899+ assertThat (timeoutCaptor .getValue ()).isEqualTo (Duration .ofSeconds (125 ).toMillis ());
900+ }
901+
880902}
0 commit comments