From 858e8a78a8cd5c22abc5003880e105c038f014cf Mon Sep 17 00:00:00 2001 From: nightlinus Date: Thu, 4 Apr 2019 12:51:00 +0300 Subject: [PATCH] Fix bunny producer to properly map headers to expected by bunny headers --- pkg/amqp-bunny/AmqpContext.php | 103 ++++++++++++++++++++++ pkg/amqp-bunny/AmqpProducer.php | 1 + pkg/amqp-bunny/Tests/AmqpProducerTest.php | 34 ++++++- 3 files changed, 135 insertions(+), 3 deletions(-) diff --git a/pkg/amqp-bunny/AmqpContext.php b/pkg/amqp-bunny/AmqpContext.php index 9338e12b0..f57554335 100644 --- a/pkg/amqp-bunny/AmqpContext.php +++ b/pkg/amqp-bunny/AmqpContext.php @@ -312,6 +312,7 @@ public function getBunnyChannel(): Channel public function convertMessage(BunnyMessage $bunnyMessage): InteropAmqpMessage { $headers = $bunnyMessage->headers; + $headers = $this->convertHeadersFromBunnyNotation($headers); $properties = []; if (isset($headers['application_headers'])) { @@ -333,4 +334,106 @@ public function convertMessage(BunnyMessage $bunnyMessage): InteropAmqpMessage return $message; } + + /** @internal It must be used here and in the producer only */ + public function convertHeadersToBunnyNotation(array $headers): array + { + if (isset($headers['content_type'])) { + $headers['content-type'] = $headers['content_type']; + unset($headers['content_type']); + } + + if (isset($headers['content_encoding'])) { + $headers['content-encoding'] = $headers['content_encoding']; + unset($headers['content_encoding']); + } + + if (isset($headers['delivery_mode'])) { + $headers['delivery-mode'] = $headers['delivery_mode']; + unset($headers['delivery_mode']); + } + + if (isset($headers['correlation_id'])) { + $headers['correlation-id'] = $headers['correlation_id']; + unset($headers['correlation_id']); + } + + if (isset($headers['reply_to'])) { + $headers['reply-to'] = $headers['reply_to']; + unset($headers['reply_to']); + } + + if (isset($headers['message_id'])) { + $headers['message-id'] = $headers['message_id']; + unset($headers['message_id']); + } + + if (isset($headers['user_id'])) { + $headers['user-id'] = $headers['user_id']; + unset($headers['user_id']); + } + + if (isset($headers['app_id'])) { + $headers['app-id'] = $headers['app_id']; + unset($headers['app_id']); + } + + if (isset($headers['cluster_id'])) { + $headers['cluster-id'] = $headers['cluster_id']; + unset($headers['cluster_id']); + } + + return $headers; + } + + /** @internal It must be used here and in the consumer only */ + public function convertHeadersFromBunnyNotation(array $bunnyHeaders): array + { + if (isset($bunnyHeaders['content-type'])) { + $bunnyHeaders['content_type'] = $bunnyHeaders['content-type']; + unset($bunnyHeaders['content-type']); + } + + if (isset($bunnyHeaders['content-encoding'])) { + $bunnyHeaders['content_encoding'] = $bunnyHeaders['content-encoding']; + unset($bunnyHeaders['content-encoding']); + } + + if (isset($bunnyHeaders['delivery-mode'])) { + $bunnyHeaders['delivery_mode'] = $bunnyHeaders['delivery-mode']; + unset($bunnyHeaders['delivery-mode']); + } + + if (isset($bunnyHeaders['correlation-id'])) { + $bunnyHeaders['correlation_id'] = $bunnyHeaders['correlation-id']; + unset($bunnyHeaders['correlation-id']); + } + + if (isset($bunnyHeaders['reply-to'])) { + $bunnyHeaders['reply_to'] = $bunnyHeaders['reply-to']; + unset($bunnyHeaders['reply-to']); + } + + if (isset($bunnyHeaders['message-id'])) { + $bunnyHeaders['message_id'] = $bunnyHeaders['message-id']; + unset($bunnyHeaders['message-id']); + } + + if (isset($bunnyHeaders['user-id'])) { + $bunnyHeaders['user_id'] = $bunnyHeaders['user-id']; + unset($bunnyHeaders['user-id']); + } + + if (isset($bunnyHeaders['app-id'])) { + $bunnyHeaders['app_id'] = $bunnyHeaders['app-id']; + unset($bunnyHeaders['app-id']); + } + + if (isset($bunnyHeaders['cluster-id'])) { + $bunnyHeaders['cluster_id'] = $bunnyHeaders['cluster-id']; + unset($bunnyHeaders['cluster-id']); + } + + return $bunnyHeaders; + } } diff --git a/pkg/amqp-bunny/AmqpProducer.php b/pkg/amqp-bunny/AmqpProducer.php index e052cd312..76892ebeb 100644 --- a/pkg/amqp-bunny/AmqpProducer.php +++ b/pkg/amqp-bunny/AmqpProducer.php @@ -136,6 +136,7 @@ private function doSend(InteropAmqpDestination $destination, InteropAmqpMessage } $amqpProperties = $message->getHeaders(); + $amqpProperties = $this->context->convertHeadersToBunnyNotation($amqpProperties); if (array_key_exists('timestamp', $amqpProperties) && null !== $amqpProperties['timestamp']) { $amqpProperties['timestamp'] = \DateTime::createFromFormat('U', (string) $amqpProperties['timestamp']); diff --git a/pkg/amqp-bunny/Tests/AmqpProducerTest.php b/pkg/amqp-bunny/Tests/AmqpProducerTest.php index 6a476790b..df33598b2 100644 --- a/pkg/amqp-bunny/Tests/AmqpProducerTest.php +++ b/pkg/amqp-bunny/Tests/AmqpProducerTest.php @@ -134,11 +134,39 @@ public function testShouldSetMessageHeaders() $channel ->expects($this->once()) ->method('publish') - ->with($this->anything(), ['content_type' => 'text/plain']) + ->with($this->anything(), ['misc' => 'text/plain']) ; $producer = new AmqpProducer($channel, $this->createContextMock()); - $producer->send(new AmqpTopic('name'), new AmqpMessage('body', [], ['content_type' => 'text/plain'])); + $producer->send(new AmqpTopic('name'), new AmqpMessage('body', [], ['misc' => 'text/plain'])); + } + + public function testShouldConvertStandartHeadersToBunnyFormat() + { + $channel = $this->createBunnyChannelMock(); + $expectedHeaders = [ + 'content-encoding' => 'utf8', + 'content-type' => 'text/plain', + 'message-id' => 'id', + 'correlation-id' => 'correlation', + 'reply-to' => 'reply', + 'delivery-mode' => 2, + ]; + $channel + ->expects($this->once()) + ->method('publish') + ->with($this->anything(), $expectedHeaders); + + $producer = new AmqpProducer($channel, $this->createContextMock()); + $message = new AmqpMessage('body', []); + $message->setMessageId('id'); + $message->setReplyTo('reply'); + $message->setDeliveryMode(2); + $message->setContentType('text/plain'); + $message->setContentEncoding('utf8'); + $message->setCorrelationId('correlation'); + + $producer->send(new AmqpTopic('name'), $message); } public function testShouldSetMessageProperties() @@ -200,7 +228,7 @@ private function createBunnyChannelMock() */ private function createContextMock() { - return $this->createMock(AmqpContext::class); + return $this->createPartialMock(AmqpContext::class, []); } /**