diff --git a/composer.json b/composer.json index 7ac30bc1..d4dbe320 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,8 @@ }, "require-dev": { "composer/xdebug-handler": "^2.0", - "open-telemetry/dev-tools": "dev-main" + "open-telemetry/dev-tools": "dev-main", + "phpunit/phpunit": "^8.0 || ^9.5 || ^10.0" }, "autoload": { "psr-4": { diff --git a/src/Instrumentation/Symfony/src/MessengerInstrumentation.php b/src/Instrumentation/Symfony/src/MessengerInstrumentation.php index 9b0b8e60..a3d37444 100644 --- a/src/Instrumentation/Symfony/src/MessengerInstrumentation.php +++ b/src/Instrumentation/Symfony/src/MessengerInstrumentation.php @@ -36,10 +36,9 @@ public static function register(): void { $instrumentation = new CachedInstrumentation('io.opentelemetry.contrib.php.symfony_messenger'); - /** - * MessageBusInterface dispatches messages to the handlers. - */ - hook( + + // Instrument MessageBusInterface (message dispatching) + hook( MessageBusInterface::class, 'dispatch', pre: static function ( @@ -54,19 +53,17 @@ public static function register(): void $message = $params[0]; $messageClass = \get_class($message); - /** @psalm-suppress ArgumentTypeCoercion */ + // Instrument dispatch as a "send" operation with SpanKind::KIND_PRODUCER $builder = $instrumentation ->tracer() - ->spanBuilder(\sprintf('DISPATCH %s', $messageClass)) - ->setSpanKind(SpanKind::KIND_PRODUCER) + ->spanBuilder(\sprintf('publish %s', $messageClass)) + ->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for dispatch ->setAttribute(TraceAttributes::CODE_FUNCTION, $function) ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) ->setAttribute(TraceAttributes::CODE_LINENO, $lineno) - ->setAttribute(self::ATTRIBUTE_MESSENGER_BUS, $class) - ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass) - ; + ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass); $parent = Context::getCurrent(); $span = $builder @@ -103,9 +100,7 @@ public static function register(): void } ); - /** - * SenderInterface sends messages to a transport. - */ + // Instrument SenderInterface (sending messages to transport) hook( SenderInterface::class, 'send', @@ -121,28 +116,88 @@ public static function register(): void $envelope = $params[0]; $messageClass = \get_class($envelope->getMessage()); - /** @psalm-suppress ArgumentTypeCoercion */ + // Instrument sending as a "send" operation with SpanKind::KIND_PRODUCER + $builder = $instrumentation + ->tracer() + ->spanBuilder(\sprintf('send %s', $messageClass)) + ->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for sending + ->setAttribute(TraceAttributes::CODE_FUNCTION, $function) + ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) + ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) + ->setAttribute(TraceAttributes::CODE_LINENO, $lineno) + ->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class) + ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass); + + $parent = Context::getCurrent(); + $span = $builder + ->setParent($parent) + ->startSpan(); + + $context = $span->storeInContext($parent); + + Context::storage()->attach($context); + + return $params; + }, + post: static function ( + SenderInterface $sender, + array $params, + ?Envelope $result, + ?\Throwable $exception + ): void { + $scope = Context::storage()->scope(); + if (null === $scope) { + return; + } + + $scope->detach(); + $span = Span::fromContext($scope->context()); + + if (null !== $exception) { + $span->recordException($exception, [ + TraceAttributes::EXCEPTION_ESCAPED => true, + ]); + $span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage()); + } + + $span->end(); + } + ); + + // Instrument the receiving of messages (consumer-side) + hook( + ReceiverInterface::class, + 'get', + static function ( + SenderInterface $bus, + array $params, + string $class, + string $function, + ?string $filename, + ?int $lineno, + ) use ($instrumentation): array { + /** @var Envelope $envelope */ + $envelope = $params[0]; + $messageClass = \get_class($envelope->getMessage()); + + // Instrument receiving as a "consume" operation with SpanKind::KIND_CONSUMER $builder = $instrumentation ->tracer() - ->spanBuilder(\sprintf('SEND %s', $messageClass)) - ->setSpanKind(SpanKind::KIND_PRODUCER) + ->spanBuilder(\sprintf('consume %s', $messageClass)) + ->setSpanKind(SpanKind::KIND_CONSUMER) // Set KIND_CONSUMER for receiving ->setAttribute(TraceAttributes::CODE_FUNCTION, $function) ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) ->setAttribute(TraceAttributes::CODE_LINENO, $lineno) - ->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class) - ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass) - ; + ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass); $parent = Context::getCurrent(); - $span = $builder ->setParent($parent) ->startSpan(); $context = $span->storeInContext($parent); - Context::storage()->attach($context); return $params; diff --git a/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php b/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php index 63d38849..5c7cbfbc 100644 --- a/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php +++ b/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php @@ -12,7 +12,7 @@ use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport; use Symfony\Component\Messenger\Transport\InMemoryTransport as LegacyInMemoryTransport; -use Symfony\Component\Messenger\Transport\TransportInterface; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; final class SendEmailMessage { @@ -35,14 +35,26 @@ protected function getMessenger(): MessageBusInterface { return new MessageBus(); } + protected function getTransport() { - // Symfony 6+ + // Symfony 6+ version of the transport if (class_exists(InMemoryTransport::class)) { return new InMemoryTransport(); } - // Symfony 5+ + // Symfony 5+ fallback + return new LegacyInMemoryTransport(); + } + + protected function getReceiver() + { + // Symfony 6+ version of the receiver + if (class_exists(ReceiverInterface::class)) { + return new InMemoryTransport(); // Example transport acting as a receiver + } + + // Symfony 5+ fallback return new LegacyInMemoryTransport(); } @@ -56,8 +68,7 @@ protected function getTransport() public function test_dispatch_message($message, string $spanName, int $kind, array $attributes) { $bus = $this->getMessenger(); - - $bus->dispatch($message); + $bus->dispatch($message); // Target the correct interface (MessageBusInterface) $this->assertCount(1, $this->storage); @@ -73,6 +84,36 @@ public function test_dispatch_message($message, string $spanName, int $kind, arr } } + /** + * Test consumer span when processing a message + */ + public function test_consume_message() + { + $transport = $this->getReceiver(); // Use the correct receiver interface + $message = new SendEmailMessage('Hello Consumer'); + $envelope = new Envelope($message); + + // Simulate receiving the message via ReceiverInterface::get + $transport->get(); + + // Simulate message consumption (processing) + $bus = $this->getMessenger(); + $bus->dispatch($message); + + // After message is consumed, we expect a consumer span + $this->assertCount(1, $this->storage); + + /** @var ImmutableSpan $span */ + $span = $this->storage[0]; + + // We expect this to be a consumer span + $this->assertEquals('CONSUME OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getName()); + $this->assertEquals(SpanKind::KIND_CONSUMER, $span->getKind()); + + $this->assertTrue($span->getAttributes()->has(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE)); + $this->assertEquals('OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getAttributes()->get(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE)); + } + /** * @dataProvider sendDataProvider * @param mixed $message @@ -172,7 +213,7 @@ public function dispatchDataProvider(): array [ new SendEmailMessage('Hello Again'), 'DISPATCH OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', - SpanKind::KIND_PRODUCER, + SpanKind::KIND_PROCESS, // Correct SpanKind for dispatching [ MessengerInstrumentation::ATTRIBUTE_MESSENGER_BUS => 'Symfony\Component\Messenger\MessageBus', MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage',