We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
2 parents 4371fce + b0f9101 commit 359fb79Copy full SHA for 359fb79
pkg/rdkafka/RdKafkaConsumer.php
@@ -169,6 +169,12 @@ private function doReceive(int $timeout): ?RdKafkaMessage
169
$message->setPartition($kafkaMessage->partition);
170
$message->setKafkaMessage($kafkaMessage);
171
172
+ // Merge headers passed from Kafka with possible earlier serialized payload headers. Prefer Kafka's.
173
+ // Note: Requires phprdkafka >= 3.1.0
174
+ if (isset($kafkaMessage->headers)) {
175
+ $message->setHeaders(array_merge($message->getHeaders(), $kafkaMessage->headers));
176
+ }
177
+
178
return $message;
179
default:
180
throw new \LogicException($kafkaMessage->errstr(), $kafkaMessage->err);
0 commit comments