|
31 | 31 | import java.util.concurrent.TimeoutException; |
32 | 32 | import java.util.function.BiFunction; |
33 | 33 | import java.util.function.Function; |
| 34 | +import java.util.function.Supplier; |
34 | 35 |
|
35 | 36 | import org.apache.commons.logging.LogFactory; |
36 | 37 | import org.apache.kafka.clients.consumer.Consumer; |
|
39 | 40 | import org.apache.kafka.common.PartitionInfo; |
40 | 41 | import org.apache.kafka.common.TopicPartition; |
41 | 42 | import org.apache.kafka.common.header.Headers; |
42 | | -import org.apache.kafka.common.header.internals.RecordHeader; |
43 | 43 | import org.apache.kafka.common.header.internals.RecordHeaders; |
44 | 44 |
|
45 | 45 | import org.springframework.core.log.LogAccessor; |
@@ -659,63 +659,70 @@ private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, E |
659 | 659 |
|
660 | 660 | private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, Exception ex) { |
661 | 661 | maybeAddHeader(kafkaHeaders, this.headerNames.original.topicHeader, |
662 | | - record.topic().getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.TOPIC); |
| 662 | + () -> record.topic().getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.TOPIC); |
663 | 663 | maybeAddHeader(kafkaHeaders, this.headerNames.original.partitionHeader, |
664 | | - ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array(), |
| 664 | + () -> ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array(), |
665 | 665 | HeaderNames.HeadersToAdd.PARTITION); |
666 | 666 | maybeAddHeader(kafkaHeaders, this.headerNames.original.offsetHeader, |
667 | | - ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array(), HeaderNames.HeadersToAdd.OFFSET); |
| 667 | + () -> ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array(), |
| 668 | + HeaderNames.HeadersToAdd.OFFSET); |
668 | 669 | maybeAddHeader(kafkaHeaders, this.headerNames.original.timestampHeader, |
669 | | - ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array(), HeaderNames.HeadersToAdd.TS); |
| 670 | + () -> ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array(), HeaderNames.HeadersToAdd.TS); |
670 | 671 | maybeAddHeader(kafkaHeaders, this.headerNames.original.timestampTypeHeader, |
671 | | - record.timestampType().toString().getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.TS_TYPE); |
| 672 | + () -> record.timestampType().toString().getBytes(StandardCharsets.UTF_8), |
| 673 | + HeaderNames.HeadersToAdd.TS_TYPE); |
672 | 674 | if (ex instanceof ListenerExecutionFailedException) { |
673 | 675 | String consumerGroup = ((ListenerExecutionFailedException) ex).getGroupId(); |
674 | 676 | if (consumerGroup != null) { |
675 | 677 | maybeAddHeader(kafkaHeaders, this.headerNames.original.consumerGroup, |
676 | | - consumerGroup.getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.GROUP); |
| 678 | + () -> consumerGroup.getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.GROUP); |
677 | 679 | } |
678 | 680 | } |
679 | 681 | } |
680 | 682 |
|
681 | | - private void maybeAddHeader(Headers kafkaHeaders, String header, byte[] value, HeaderNames.HeadersToAdd hta) { |
| 683 | + private void maybeAddHeader(Headers kafkaHeaders, String header, Supplier<byte[]> valueSupplier, |
| 684 | + HeaderNames.HeadersToAdd hta) { |
| 685 | + |
682 | 686 | if (this.whichHeaders.contains(hta) |
683 | 687 | && (this.appendOriginalHeaders || kafkaHeaders.lastHeader(header) == null)) { |
684 | | - kafkaHeaders.add(header, value); |
| 688 | + kafkaHeaders.add(header, valueSupplier.get()); |
685 | 689 | } |
686 | 690 | } |
687 | 691 |
|
688 | 692 | private void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception, boolean isKey, |
689 | 693 | HeaderNames names) { |
690 | 694 |
|
691 | | - appendOrReplace(kafkaHeaders, new RecordHeader(isKey ? names.exceptionInfo.keyExceptionFqcn |
692 | | - : names.exceptionInfo.exceptionFqcn, |
693 | | - exception.getClass().getName().getBytes(StandardCharsets.UTF_8)), HeaderNames.HeadersToAdd.EXCEPTION); |
| 695 | + appendOrReplace(kafkaHeaders, |
| 696 | + isKey ? names.exceptionInfo.keyExceptionFqcn : names.exceptionInfo.exceptionFqcn, |
| 697 | + () -> exception.getClass().getName().getBytes(StandardCharsets.UTF_8), |
| 698 | + HeaderNames.HeadersToAdd.EXCEPTION); |
694 | 699 | if (exception.getCause() != null) { |
695 | | - appendOrReplace(kafkaHeaders, new RecordHeader(names.exceptionInfo.exceptionCauseFqcn, |
696 | | - exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8)), |
| 700 | + appendOrReplace(kafkaHeaders, |
| 701 | + names.exceptionInfo.exceptionCauseFqcn, |
| 702 | + () -> exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8), |
697 | 703 | HeaderNames.HeadersToAdd.EX_CAUSE); |
698 | 704 | } |
699 | 705 | String message = exception.getMessage(); |
700 | 706 | if (message != null) { |
701 | | - appendOrReplace(kafkaHeaders, new RecordHeader(isKey |
702 | | - ? names.exceptionInfo.keyExceptionMessage |
703 | | - : names.exceptionInfo.exceptionMessage, |
704 | | - exception.getMessage().getBytes(StandardCharsets.UTF_8)), HeaderNames.HeadersToAdd.EX_MSG); |
| 707 | + appendOrReplace(kafkaHeaders, |
| 708 | + isKey ? names.exceptionInfo.keyExceptionMessage : names.exceptionInfo.exceptionMessage, |
| 709 | + () -> exception.getMessage().getBytes(StandardCharsets.UTF_8), |
| 710 | + HeaderNames.HeadersToAdd.EX_MSG); |
705 | 711 | } |
706 | | - appendOrReplace(kafkaHeaders, new RecordHeader(isKey |
707 | | - ? names.exceptionInfo.keyExceptionStacktrace |
708 | | - : names.exceptionInfo.exceptionStacktrace, |
709 | | - getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)), |
| 712 | + appendOrReplace(kafkaHeaders, |
| 713 | + isKey ? names.exceptionInfo.keyExceptionStacktrace : names.exceptionInfo.exceptionStacktrace, |
| 714 | + () -> getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8), |
710 | 715 | HeaderNames.HeadersToAdd.EX_STACKTRACE); |
711 | 716 | } |
712 | 717 |
|
713 | | - private void appendOrReplace(Headers headers, RecordHeader header, HeaderNames.HeadersToAdd hta) { |
| 718 | + private void appendOrReplace(Headers headers, String header, Supplier<byte[]> valueSupplier, |
| 719 | + HeaderNames.HeadersToAdd hta) { |
| 720 | + |
714 | 721 | if (this.whichHeaders.contains(hta)) { |
715 | 722 | if (this.stripPreviousExceptionHeaders) { |
716 | | - headers.remove(header.key()); |
| 723 | + headers.remove(header); |
717 | 724 | } |
718 | | - headers.add(header); |
| 725 | + headers.add(header, valueSupplier.get()); |
719 | 726 | } |
720 | 727 | } |
721 | 728 |
|
|
0 commit comments