diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index d7016a946..241ee3841 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -169,6 +169,12 @@ private function doReceive(int $timeout): ?RdKafkaMessage $message->setPartition($kafkaMessage->partition); $message->setKafkaMessage($kafkaMessage); + // Merge headers passed from Kafka with possible earlier serialized payload headers. Prefer Kafka's. + // Note: Requires phprdkafka >= 3.1.0 + if (isset($kafkaMessage->headers)) { + $message->setHeaders(array_merge($message->getHeaders(), $kafkaMessage->headers)); + } + return $message; default: throw new \LogicException($kafkaMessage->errstr(), $kafkaMessage->err);