Skip to content

Allow sending & reading headers via Kafka Message object. #880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/rdkafka/RdKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public function getQueue(): Queue
}

/**
* @param int $timeout
*
* @return RdKafkaMessage
*/
public function receive(int $timeout = 0): ?Message
Expand Down Expand Up @@ -141,6 +143,7 @@ public function acknowledge(Message $message): void

/**
* @param RdKafkaMessage $message
* @param bool $requeue
*/
public function reject(Message $message, bool $requeue = false): void
{
Expand Down Expand Up @@ -169,6 +172,12 @@ private function doReceive(int $timeout): ?RdKafkaMessage
$message->setPartition($kafkaMessage->partition);
$message->setKafkaMessage($kafkaMessage);

// Merge headers passed from Kafka with possible earlier serialized payload headers. Prefer Kafka's.
// Note: Requires phprdkafka >= 3.1.0
if (isset($kafkaMessage->headers)) {
$message->setHeaders(array_merge($message->getHeaders(), $kafkaMessage->headers));
}

return $message;
default:
throw new \LogicException($kafkaMessage->errstr(), $kafkaMessage->err);
Expand Down
13 changes: 13 additions & 0 deletions pkg/rdkafka/RdKafkaContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ public function __construct(array $config)
$this->setSerializer(new JsonSerializer());
}

public static function getLibrdKafkaVersion(): string
{
if (!defined('RD_KAFKA_VERSION')) {
throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed');
}

$major = (RD_KAFKA_VERSION & 0xFF000000) >> 24;
$minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16;
$patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8;

return "$major.$minor.$patch";
}

/**
* @return RdKafkaMessage
*/
Expand Down
13 changes: 12 additions & 1 deletion pkg/rdkafka/RdKafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,18 @@ public function send(Destination $destination, Message $message): void
$key = $message->getKey() ?: $destination->getKey() ?: null;

$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
// Note: Topic::producev method exists in phprdkafka > 3.1.0
// Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version
if (method_exists($topic, 'producev')) {
// Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka 1.0.0 causing segfault
if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=')
&& version_compare(phpversion('rdkafka'), '3.1.0', '<=')) {
trigger_error('Phprdkafka < 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`', E_USER_WARNING);
}
$topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably also should contain a check against phprdkafka version & librdkafka 1.0.0, since there is a possible segfault here.

EDIT: Well, as a matter of fact, the PR that fixes the producev call for librdkafka 1.0.0 is not yet included in a release, so it will probably have to remain as a TODO once it does.

Related PR:
arnaud-lb/php-rdkafka#222

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added version check. I've tried it with local build from master and

var_dump(version_compare('3.1.0-dev', '3.1.0', '<=');
// false

which is the version that you will get if you build from master branch of phprdkafka.

I'm considering whether or not this check is worthwile, however otherwise it will result in hard to debug segfault.

@makasim what are you thought on this? Should it be dropped? Should it emit a warning (like here) or "drop down" to calling produce?

} else {
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
}
}

/**
Expand Down