Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIND::CONSUMER instrumentation implemented #307

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
},
"require-dev": {
"composer/xdebug-handler": "^2.0",
"open-telemetry/dev-tools": "dev-main"
"open-telemetry/dev-tools": "dev-main",
"phpunit/phpunit": "^8.0 || ^9.5 || ^10.0"
},
"autoload": {
"psr-4": {
Expand Down
97 changes: 76 additions & 21 deletions src/Instrumentation/Symfony/src/MessengerInstrumentation.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ public static function register(): void
{
$instrumentation = new CachedInstrumentation('io.opentelemetry.contrib.php.symfony_messenger');

/**
* MessageBusInterface dispatches messages to the handlers.
*/
hook(

// Instrument MessageBusInterface (message dispatching)
hook(
MessageBusInterface::class,
'dispatch',
pre: static function (
Expand All @@ -54,19 +53,17 @@ public static function register(): void
$message = $params[0];
$messageClass = \get_class($message);

/** @psalm-suppress ArgumentTypeCoercion */
// Instrument dispatch as a "send" operation with SpanKind::KIND_PRODUCER
$builder = $instrumentation
->tracer()
->spanBuilder(\sprintf('DISPATCH %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER)
->spanBuilder(\sprintf('publish %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for dispatch
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)

->setAttribute(self::ATTRIBUTE_MESSENGER_BUS, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
;
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);

$parent = Context::getCurrent();
$span = $builder
Expand Down Expand Up @@ -103,9 +100,7 @@ public static function register(): void
}
);

/**
* SenderInterface sends messages to a transport.
*/
// Instrument SenderInterface (sending messages to transport)
hook(
SenderInterface::class,
'send',
Expand All @@ -121,28 +116,88 @@ public static function register(): void
$envelope = $params[0];
$messageClass = \get_class($envelope->getMessage());

/** @psalm-suppress ArgumentTypeCoercion */
// Instrument sending as a "send" operation with SpanKind::KIND_PRODUCER
$builder = $instrumentation
->tracer()
->spanBuilder(\sprintf('send %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for sending
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)
->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);

$parent = Context::getCurrent();
$span = $builder
->setParent($parent)
->startSpan();

$context = $span->storeInContext($parent);

Context::storage()->attach($context);

return $params;
},
post: static function (
SenderInterface $sender,
array $params,
?Envelope $result,
?\Throwable $exception
): void {
$scope = Context::storage()->scope();
if (null === $scope) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());

if (null !== $exception) {
$span->recordException($exception, [
TraceAttributes::EXCEPTION_ESCAPED => true,
]);
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
}

$span->end();
}
);

// Instrument the receiving of messages (consumer-side)
hook(
ReceiverInterface::class,
'get',
static function (
SenderInterface $bus,
array $params,
string $class,
string $function,
?string $filename,
?int $lineno,
) use ($instrumentation): array {
/** @var Envelope $envelope */
$envelope = $params[0];
$messageClass = \get_class($envelope->getMessage());

// Instrument receiving as a "consume" operation with SpanKind::KIND_CONSUMER
$builder = $instrumentation
->tracer()
->spanBuilder(\sprintf('SEND %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER)
->spanBuilder(\sprintf('consume %s', $messageClass))
->setSpanKind(SpanKind::KIND_CONSUMER) // Set KIND_CONSUMER for receiving
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)

->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
;
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);

$parent = Context::getCurrent();

$span = $builder
->setParent($parent)
->startSpan();

$context = $span->storeInContext($parent);

Context::storage()->attach($context);

return $params;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;
use Symfony\Component\Messenger\Transport\InMemoryTransport as LegacyInMemoryTransport;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

final class SendEmailMessage
{
Expand All @@ -35,14 +35,26 @@ protected function getMessenger(): MessageBusInterface
{
return new MessageBus();
}

protected function getTransport()
{
// Symfony 6+
// Symfony 6+ version of the transport
if (class_exists(InMemoryTransport::class)) {
return new InMemoryTransport();
}

// Symfony 5+
// Symfony 5+ fallback
return new LegacyInMemoryTransport();
}

protected function getReceiver()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this different enough from getTransport to require a method? It doesn't return a receiver but a transport...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RichardChukwu have you had any time to look further into this?

Copy link
Contributor Author

@RichardChukwu RichardChukwu Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ChrisLightfootWild, thanks for asking.

Not really, I'll appreciate any assistance please

{
// Symfony 6+ version of the receiver
if (class_exists(ReceiverInterface::class)) {
return new InMemoryTransport(); // Example transport acting as a receiver
}

// Symfony 5+ fallback
return new LegacyInMemoryTransport();
}

Expand All @@ -56,8 +68,7 @@ protected function getTransport()
public function test_dispatch_message($message, string $spanName, int $kind, array $attributes)
{
$bus = $this->getMessenger();

$bus->dispatch($message);
$bus->dispatch($message); // Target the correct interface (MessageBusInterface)

$this->assertCount(1, $this->storage);

Expand All @@ -73,6 +84,36 @@ public function test_dispatch_message($message, string $spanName, int $kind, arr
}
}

/**
* Test consumer span when processing a message
*/
public function test_consume_message()
{
$transport = $this->getReceiver(); // Use the correct receiver interface
$message = new SendEmailMessage('Hello Consumer');
$envelope = new Envelope($message);

// Simulate receiving the message via ReceiverInterface::get
$transport->get();

// Simulate message consumption (processing)
$bus = $this->getMessenger();
$bus->dispatch($message);

// After message is consumed, we expect a consumer span
$this->assertCount(1, $this->storage);

/** @var ImmutableSpan $span */
$span = $this->storage[0];

// We expect this to be a consumer span
$this->assertEquals('CONSUME OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getName());
$this->assertEquals(SpanKind::KIND_CONSUMER, $span->getKind());

$this->assertTrue($span->getAttributes()->has(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE));
$this->assertEquals('OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getAttributes()->get(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE));
}

/**
* @dataProvider sendDataProvider
* @param mixed $message
Expand Down Expand Up @@ -172,7 +213,7 @@ public function dispatchDataProvider(): array
[
new SendEmailMessage('Hello Again'),
'DISPATCH OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage',
SpanKind::KIND_PRODUCER,
SpanKind::KIND_PROCESS, // Correct SpanKind for dispatching
[
MessengerInstrumentation::ATTRIBUTE_MESSENGER_BUS => 'Symfony\Component\Messenger\MessageBus',
MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage',
Expand Down
Loading