@@ -258,7 +258,7 @@ void headersNotStripped() {
258258 headers = captor .getValue ().headers ();
259259 assertThat (headers .lastHeader (SerializationUtils .VALUE_DESERIALIZER_EXCEPTION_HEADER )).isNotNull ();
260260 assertThat (headers .lastHeader (SerializationUtils .KEY_DESERIALIZER_EXCEPTION_HEADER )).isNotNull ();
261- assertThat (headers .lastHeader (KafkaHeaders .DLT_KEY_EXCEPTION_MESSAGE ).value ()).isEqualTo ("testK" . getBytes () );
261+ assertThat (new String ( headers .lastHeader (KafkaHeaders .DLT_KEY_EXCEPTION_MESSAGE ).value ())) .isEqualTo ("testK" );
262262 assertThat (headers .lastHeader (KafkaHeaders .DLT_EXCEPTION_MESSAGE ).value ()).isEqualTo ("testV" .getBytes ());
263263 }
264264
@@ -399,7 +399,8 @@ void appendOriginalHeaders() {
399399 DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer (template );
400400 recoverer .setAppendOriginalHeaders (true );
401401 recoverer .setStripPreviousExceptionHeaders (false );
402- recoverer .accept (record , new RuntimeException (new IllegalStateException ()));
402+ recoverer .accept (record , new ListenerExecutionFailedException ("Listener failed" ,
403+ new TimestampedException (new RuntimeException ("ex1 msg" , new IllegalStateException ()))));
403404 ArgumentCaptor <ProducerRecord > producerRecordCaptor = ArgumentCaptor .forClass (ProducerRecord .class );
404405 then (template ).should (times (1 )).send (producerRecordCaptor .capture ());
405406 Headers headers = producerRecordCaptor .getValue ().headers ();
@@ -412,11 +413,15 @@ void appendOriginalHeaders() {
412413 Header firstExceptionCauseType = headers .lastHeader (KafkaHeaders .DLT_EXCEPTION_CAUSE_FQCN );
413414 Header firstExceptionMessage = headers .lastHeader (KafkaHeaders .DLT_EXCEPTION_MESSAGE );
414415 Header firstExceptionStackTrace = headers .lastHeader (KafkaHeaders .DLT_EXCEPTION_STACKTRACE );
416+ assertThat (new String (firstExceptionMessage .value ())).isEqualTo ("Listener failed; ex1 msg" );
417+ assertThat (new String (firstExceptionType .value ())).isEqualTo (ListenerExecutionFailedException .class .getName ());
418+ assertThat (new String (firstExceptionCauseType .value ())).isEqualTo (RuntimeException .class .getName ());
415419
416420 ConsumerRecord <String , String > anotherRecord = new ConsumerRecord <>("bar" , 1 , 12L , 4321L ,
417421 TimestampType .LOG_APPEND_TIME , 321 , 321 , "bar" , null , new RecordHeaders (), Optional .empty ());
418422 headers .forEach (header -> anotherRecord .headers ().add (header ));
419- recoverer .accept (anotherRecord , new RuntimeException (new IllegalStateException ()));
423+ recoverer .accept (anotherRecord , new ListenerExecutionFailedException ("Listener failed" ,
424+ new TimestampedException (new RuntimeException ("ex2 msg" , new IllegalStateException ()))));
420425 ArgumentCaptor <ProducerRecord > anotherProducerRecordCaptor = ArgumentCaptor .forClass (ProducerRecord .class );
421426 then (template ).should (times (2 )).send (anotherProducerRecordCaptor .capture ());
422427 Headers anotherHeaders = anotherProducerRecordCaptor .getAllValues ().get (1 ).headers ();
@@ -436,6 +441,8 @@ void appendOriginalHeaders() {
436441 assertThat (anotherHeaders .lastHeader (KafkaHeaders .DLT_EXCEPTION_CAUSE_FQCN ))
437442 .isNotSameAs (firstExceptionCauseType );
438443 assertThat (anotherHeaders .lastHeader (KafkaHeaders .DLT_EXCEPTION_MESSAGE )).isNotSameAs (firstExceptionMessage );
444+ assertThat (new String (anotherHeaders .lastHeader (KafkaHeaders .DLT_EXCEPTION_MESSAGE ).value ()))
445+ .isEqualTo ("Listener failed; ex2 msg" );
439446 assertThat (anotherHeaders .lastHeader (KafkaHeaders .DLT_EXCEPTION_STACKTRACE ))
440447 .isNotSameAs (firstExceptionStackTrace );
441448 Iterator <Header > exceptionHeaders = anotherHeaders .headers (KafkaHeaders .DLT_EXCEPTION_FQCN ).iterator ();
0 commit comments