diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java index 585fe4f687..fc6a1ea9f5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java @@ -34,7 +34,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.BatchConsumerAwareMessageListener; @@ -148,11 +147,7 @@ public void onMessage(List>>> if (this.releaseStrategy.test(list, false)) { ConsumerRecord>> done = new ConsumerRecord<>(AGGREGATED_RESULTS_TOPIC, 0, 0L, null, list); - done.headers() - .add(new RecordHeader(correlationHeaderName, - isBinaryCorrelation() - ? ((CorrelationKey) correlationId).getCorrelationId() - : ((String) correlationId).getBytes(StandardCharsets.UTF_8))); + done.headers().add(correlation); this.pending.remove(correlationId); checkOffsetsAndCommitIfNecessary(list, consumer); completed.add(done);