From 870859254e08216ef2d108ff77cab4bdf5f50bd6 Mon Sep 17 00:00:00 2001 From: Paul McLaren Date: Tue, 18 Jul 2017 09:30:40 +0300 Subject: [PATCH 1/9] amqplib --- composer.json | 5 + pkg/amqplib/AmqpConnectionFactory.php | 49 ++++++++ pkg/amqplib/AmqpConsumer.php | 79 ++++++++++++ pkg/amqplib/AmqpContext.php | 159 ++++++++++++++++++++++++ pkg/amqplib/AmqpMessage.php | 167 ++++++++++++++++++++++++++ pkg/amqplib/AmqpProducer.php | 61 ++++++++++ pkg/amqplib/AmqpQueue.php | 137 +++++++++++++++++++++ pkg/amqplib/AmqpTopic.php | 124 +++++++++++++++++++ pkg/amqplib/composer.json | 43 +++++++ 9 files changed, 824 insertions(+) create mode 100644 pkg/amqplib/AmqpConnectionFactory.php create mode 100644 pkg/amqplib/AmqpConsumer.php create mode 100644 pkg/amqplib/AmqpContext.php create mode 100644 pkg/amqplib/AmqpMessage.php create mode 100644 pkg/amqplib/AmqpProducer.php create mode 100644 pkg/amqplib/AmqpQueue.php create mode 100644 pkg/amqplib/AmqpTopic.php create mode 100644 pkg/amqplib/composer.json diff --git a/composer.json b/composer.json index c6c665357..3b9762396 100644 --- a/composer.json +++ b/composer.json @@ -7,6 +7,7 @@ "enqueue/enqueue": "*@dev", "enqueue/stomp": "*@dev", "enqueue/amqp-ext": "*@dev", + "enqueue/amqplib": "*@dev", "enqueue/redis": "*@dev", "enqueue/fs": "*@dev", "enqueue/null": "*@dev", @@ -68,6 +69,10 @@ "type": "path", "url": "pkg/amqp-ext" }, + { + "type": "path", + "url": "pkg/amqplib" + }, { "type": "path", "url": "pkg/redis" diff --git a/pkg/amqplib/AmqpConnectionFactory.php b/pkg/amqplib/AmqpConnectionFactory.php new file mode 100644 index 000000000..5ea984f6e --- /dev/null +++ b/pkg/amqplib/AmqpConnectionFactory.php @@ -0,0 +1,49 @@ +config = array_replace($this->defaultConfig(), $config); + } + + public function createContext() + { + return new AmqpContext($this->establishConnection()); + } + + private function establishConnection() + { + if (false == $this->connection) { + $this->connection = new AMQPStreamConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'] + ); + } + + return $this->connection; + } + + private function defaultConfig() + { + return [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + ]; + } +} diff --git a/pkg/amqplib/AmqpConsumer.php b/pkg/amqplib/AmqpConsumer.php new file mode 100644 index 000000000..f9df70d88 --- /dev/null +++ b/pkg/amqplib/AmqpConsumer.php @@ -0,0 +1,79 @@ +channel = $channel; + $this->queue = $queue; + } + + public function getQueue() + { + return $this->queue; + } + + public function receive($timeout = 0) + { + $end = microtime(true) + ($timeout / 1000); + + while (0 === $timeout || microtime(true) < $end) { + if ($message = $this->receiveNoWait()) { + return $message; + } + + usleep(100000); //100ms + } + } + + public function receiveNoWait() + { + if ($message = $this->channel->basic_get($this->queue->getQueueName())) { + return $this->convertMessage($message); + } + } + + public function acknowledge(PsrMessage $message) + { + InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class); + + $this->channel->basic_ack($message->getDeliveryTag()); + } + + public function reject(PsrMessage $message, $requeue = false) + { + InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class); + + $this->channel->basic_reject($message->getDeliveryTag(), $requeue); + } + + private function convertMessage(LibAMQPMessage $amqpMessage) + { + $headers = new AMQPTable($amqpMessage->get_properties()); + $headers = $headers->getNativeData(); + + $properties = []; + if (isset($headers['application_headers'])) { + $properties = $headers['application_headers']; + } + unset($headers['application_headers']); + + $message = new AmqpMessage($amqpMessage->getBody(), $properties, $headers); + $message->setDeliveryTag($amqpMessage->delivery_info['delivery_tag']); + $message->setRedelivered($amqpMessage->delivery_info['redelivered']); + + return $message; + } +} diff --git a/pkg/amqplib/AmqpContext.php b/pkg/amqplib/AmqpContext.php new file mode 100644 index 000000000..4e7704746 --- /dev/null +++ b/pkg/amqplib/AmqpContext.php @@ -0,0 +1,159 @@ +connection = $connection; + } + + public function createMessage($body = null, array $properties = [], array $headers = []) + { + return new AmqpMessage($body, $properties, $headers); + } + + public function createQueue($name) + { + return new AmqpQueue($name); + } + + public function createTopic($name) + { + return new AmqpTopic($name); + } + + public function createConsumer(PsrDestination $destination) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class); + + return new AmqpConsumer($this->getChannel(), $destination); + } + + public function createProducer() + { + return new AmqpProducer($this->getChannel()); + } + + public function createTemporaryQueue() + { + $queue = $this->createQueue(null); + $queue->setExclusive(true); + + $this->declareQueue($queue); + + return $queue; + } + + public function declareTopic(PsrDestination $destination) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class); + + $this->getChannel()->exchange_declare( + $destination->getTopicName(), + $destination->getType(), + $destination->isPassive(), + $destination->isDurable(), + $destination->isAutoDelete(), + $destination->isInternal(), + $destination->isNoWait(), + $destination->getArguments(), + $destination->getTicket() + ); + } + + public function declareQueue(PsrDestination $destination) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class); + + $this->getChannel()->queue_declare( + $destination->getQueueName(), + $destination->isPassive(), + $destination->isDurable(), + $destination->isExclusive(), + $destination->isAutoDelete(), + $destination->isNoWait(), + $destination->getArguments(), + $destination->getTicket() + ); + } + + public function bind(PsrDestination $source, PsrDestination $target) + { + $source instanceof PsrTopic + ? InvalidDestinationException::assertDestinationInstanceOf($source, AmqpTopic::class) + : InvalidDestinationException::assertDestinationInstanceOf($source, AmqpQueue::class) + ; + + $target instanceof PsrTopic + ? InvalidDestinationException::assertDestinationInstanceOf($target, AmqpTopic::class) + : InvalidDestinationException::assertDestinationInstanceOf($target, AmqpQueue::class) + ; + + if ($source instanceof AmqpQueue && $target instanceof AmqpQueue) { + throw new Exception('Is not possible to bind queue to queue. It is possible to bind topic to queue or topic to topic'); + } + + // bind exchange to exchange + if ($source instanceof AmqpTopic && $target instanceof AmqpTopic) { + $this->getChannel()->exchange_bind( + $target->getTopicName(), + $source->getTopicName(), + $source->getRoutingKey(), + $source->isNowait(), + $source->getArguments(), + $source->getTicket() + ); + // bind queue to exchange + } elseif ($source instanceof AmqpQueue) { + $this->getChannel()->queue_bind( + $source->getQueueName(), + $target->getTopicName(), + $target->getRoutingKey(), + $target->isNowait(), + $target->getArguments(), + $target->getTicket() + ); + // bind exchange to queue + } else { + $this->getChannel()->queue_bind( + $target->getQueueName(), + $source->getTopicName(), + $source->getRoutingKey(), + $source->isNowait(), + $source->getArguments(), + $source->getTicket() + ); + } + } + + /** + * {@inheritdoc} + */ + public function close() + { + if ($this->channel) { + $this->channel->close(); + } + } + + private function getChannel() + { + if (null === $this->channel) { + $this->channel = $this->connection->channel(); + } + + return $this->channel; + } +} diff --git a/pkg/amqplib/AmqpMessage.php b/pkg/amqplib/AmqpMessage.php new file mode 100644 index 000000000..a9e13be86 --- /dev/null +++ b/pkg/amqplib/AmqpMessage.php @@ -0,0 +1,167 @@ +body = $body; + $this->properties = $properties; + $this->headers = $headers; + $this->redelivered = false; + } + + public function getBody() + { + return $this->body; + } + + public function setBody($body) + { + $this->body = $body; + } + + public function setProperties(array $properties) + { + $this->properties = $properties; + } + + public function getProperties() + { + return $this->properties; + } + + public function setProperty($name, $value) + { + $this->properties[$name] = $value; + } + + public function getProperty($name, $default = null) + { + return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default; + } + + public function setHeaders(array $headers) + { + $this->headers = $headers; + } + + public function getHeaders() + { + return $this->headers; + } + + public function setHeader($name, $value) + { + $this->headers[$name] = $value; + } + + public function getHeader($name, $default = null) + { + return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default; + } + + public function setRedelivered($redelivered) + { + $this->redelivered = (bool) $redelivered; + } + + public function isRedelivered() + { + return $this->redelivered; + } + + public function setCorrelationId($correlationId) + { + $this->setHeader('correlation_id', $correlationId); + } + + public function getCorrelationId() + { + return $this->getHeader('correlation_id'); + } + + public function setMessageId($messageId) + { + $this->setHeader('message_id', $messageId); + } + + public function getMessageId() + { + return $this->getHeader('message_id'); + } + + public function getTimestamp() + { + $value = $this->getHeader('timestamp'); + + return $value === null ? null : (int) $value; + } + + public function setTimestamp($timestamp) + { + $this->setHeader('timestamp', $timestamp); + } + + public function setReplyTo($replyTo) + { + $this->setHeader('reply_to', $replyTo); + } + + public function getReplyTo() + { + return $this->getHeader('reply_to'); + } + + public function getDeliveryTag() + { + return $this->deliveryTag; + } + + public function setDeliveryTag($deliveryTag) + { + $this->deliveryTag = $deliveryTag; + } + + public function isMandatory() + { + return $this->mandatory; + } + + public function setMandatory($mandatory) + { + $this->mandatory = $mandatory; + } + + public function isImmediate() + { + return $this->immediate; + } + + public function setImmediate($immediate) + { + $this->immediate = $immediate; + } + + public function getTicket() + { + return $this->ticket; + } + + public function setTicket($ticket) + { + $this->ticket = $ticket; + } +} diff --git a/pkg/amqplib/AmqpProducer.php b/pkg/amqplib/AmqpProducer.php new file mode 100644 index 000000000..578942333 --- /dev/null +++ b/pkg/amqplib/AmqpProducer.php @@ -0,0 +1,61 @@ +channel = $channel; + } + + public function send(PsrDestination $destination, PsrMessage $message) + { + $destination instanceof PsrTopic + ? InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class) + : InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class) + ; + + InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class); + + $amqpProperties = $message->getHeaders(); + + if ($appProperties = $message->getProperties()) { + $amqpProperties['application_headers'] = new AMQPTable($appProperties); + } + + $amqpMessage = new LibAMQPMessage($message->getBody(), $amqpProperties); + + if ($destination instanceof AmqpTopic) { + $this->channel->basic_publish( + $amqpMessage, + $destination->getTopicName(), + $destination->getRoutingKey(), + $message->isMandatory(), + $message->isImmediate(), + $message->getTicket() + ); + } else { + $this->channel->basic_publish( + $amqpMessage, + '', + $destination->getQueueName(), + $message->isMandatory(), + $message->isImmediate(), + $message->getTicket() + ); + } + } +} diff --git a/pkg/amqplib/AmqpQueue.php b/pkg/amqplib/AmqpQueue.php new file mode 100644 index 000000000..fef8ff147 --- /dev/null +++ b/pkg/amqplib/AmqpQueue.php @@ -0,0 +1,137 @@ +name = $name; + $this->passive = false; + $this->durable = false; + $this->exclusive = false; + $this->autoDelete = true; + $this->noWait = false; + $this->noLocal = false; + $this->noAck = false; + } + + public function getQueueName() + { + return $this->name; + } + + public function isPassive() + { + return $this->passive; + } + + public function setPassive($passive) + { + $this->passive = (bool) $passive; + } + + public function isDurable() + { + return $this->durable; + } + + public function setDurable($durable) + { + $this->durable = (bool) $durable; + } + + public function isExclusive() + { + return $this->exclusive; + } + + public function setExclusive($exclusive) + { + $this->exclusive = (bool) $exclusive; + } + + public function isAutoDelete() + { + return $this->autoDelete; + } + + public function setAutoDelete($autoDelete) + { + $this->autoDelete = (bool) $autoDelete; + } + + public function isNoWait() + { + return $this->noWait; + } + + public function setNoWait($noWait) + { + $this->noWait = (bool) $noWait; + } + + public function getArguments() + { + return $this->arguments; + } + + public function setArguments(array $arguments = null) + { + $this->arguments = $arguments; + } + + public function getTicket() + { + return $this->ticket; + } + + public function setTicket($ticket) + { + $this->ticket = $ticket; + } + + public function getConsumerTag() + { + return $this->consumerTag; + } + + public function setConsumerTag($consumerTag) + { + $this->consumerTag = $consumerTag; + } + + public function isNoLocal() + { + return $this->noLocal; + } + + public function setNoLocal($noLocal) + { + $this->noLocal = $noLocal; + } + + public function isNoAck() + { + return $this->noAck; + } + + public function setNoAck($noAck) + { + $this->noAck = $noAck; + } +} diff --git a/pkg/amqplib/AmqpTopic.php b/pkg/amqplib/AmqpTopic.php new file mode 100644 index 000000000..71a056cc4 --- /dev/null +++ b/pkg/amqplib/AmqpTopic.php @@ -0,0 +1,124 @@ +name = $name; + $this->passive = false; + $this->durable = false; + $this->autoDelete = true; + $this->internal = false; + $this->noWait = false; + } + + public function getTopicName() + { + return $this->name; + } + + public function getType() + { + return $this->type; + } + + public function setType($type) + { + $this->type = $type; + } + + public function isPassive() + { + return $this->passive; + } + + public function setPassive($passive) + { + $this->passive = (bool) $passive; + } + + public function isDurable() + { + return $this->durable; + } + + public function setDurable($durable) + { + $this->durable = (bool) $durable; + } + + public function isAutoDelete() + { + return $this->autoDelete; + } + + public function setAutoDelete($autoDelete) + { + $this->autoDelete = (bool) $autoDelete; + } + + public function isInternal() + { + return $this->internal; + } + + public function setInternal($internal) + { + $this->internal = (bool) $internal; + } + + public function isNoWait() + { + return $this->noWait; + } + + public function setNoWait($noWait) + { + $this->noWait = (bool) $noWait; + } + + public function getArguments() + { + return $this->arguments; + } + + public function setArguments(array $arguments = null) + { + $this->arguments = $arguments; + } + + public function getTicket() + { + return $this->ticket; + } + + public function setTicket($ticket) + { + $this->ticket = $ticket; + } + + public function getRoutingKey() + { + return $this->routingKey; + } + + public function setRoutingKey($routingKey) + { + $this->routingKey = $routingKey; + } +} diff --git a/pkg/amqplib/composer.json b/pkg/amqplib/composer.json new file mode 100644 index 000000000..e6db12bb7 --- /dev/null +++ b/pkg/amqplib/composer.json @@ -0,0 +1,43 @@ +{ + "name": "enqueue/amqplib", + "type": "library", + "description": "Message Queue Amqp Transport", + "keywords": ["messaging", "queue", "amqp"], + "license": "MIT", + "repositories": [ + { + "type": "vcs", + "url": "git@github.com:php-enqueue/test.git" + } + ], + "require": { + "php": ">=5.6", + "php-amqplib/php-amqplib": "^2.6", + "queue-interop/queue-interop": "^0.5@dev", + "psr/log": "^1" + }, + "require-dev": { + "phpunit/phpunit": "~5.4.0", + "enqueue/test": "^0.6@dev", + "enqueue/enqueue": "^0.6@dev", + "enqueue/null": "^0.6@dev", + "queue-interop/queue-spec": "^0.5@dev", + "symfony/dependency-injection": "^2.8|^3", + "symfony/config": "^2.8|^3" + }, + "autoload": { + "psr-4": { "Enqueue\\Amqplib\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "suggest": { + "enqueue/enqueue": "If you'd like to use advanced features like Client abstract layer or Symfony integration features" + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.6.x-dev" + } + } +} From 1d511108ac2348b02440992309b5ca84b56caf82 Mon Sep 17 00:00:00 2001 From: Paul McLaren Date: Tue, 18 Jul 2017 15:34:22 +0300 Subject: [PATCH 2/9] rename amqp-lib --- composer.json | 4 +- .../AmqpConnectionFactory.php | 21 ++- pkg/{amqplib => amqp-lib}/AmqpConsumer.php | 36 ++++- pkg/{amqplib => amqp-lib}/AmqpContext.php | 59 +++++++- pkg/{amqplib => amqp-lib}/AmqpMessage.php | 130 +++++++++++++++++- pkg/{amqplib => amqp-lib}/AmqpProducer.php | 12 +- pkg/{amqplib => amqp-lib}/AmqpQueue.php | 108 ++++++++++++++- pkg/{amqplib => amqp-lib}/AmqpTopic.php | 101 +++++++++++++- pkg/{amqplib => amqp-lib}/composer.json | 4 +- 9 files changed, 461 insertions(+), 14 deletions(-) rename pkg/{amqplib => amqp-lib}/AmqpConnectionFactory.php (80%) rename pkg/{amqplib => amqp-lib}/AmqpConsumer.php (78%) rename pkg/{amqplib => amqp-lib}/AmqpContext.php (83%) rename pkg/{amqplib => amqp-lib}/AmqpMessage.php (65%) rename pkg/{amqplib => amqp-lib}/AmqpProducer.php (89%) rename pkg/{amqplib => amqp-lib}/AmqpQueue.php (65%) rename pkg/{amqplib => amqp-lib}/AmqpTopic.php (64%) rename pkg/{amqplib => amqp-lib}/composer.json (93%) diff --git a/composer.json b/composer.json index 3b9762396..0f3b8791e 100644 --- a/composer.json +++ b/composer.json @@ -7,7 +7,7 @@ "enqueue/enqueue": "*@dev", "enqueue/stomp": "*@dev", "enqueue/amqp-ext": "*@dev", - "enqueue/amqplib": "*@dev", + "enqueue/amqp-lib": "*@dev", "enqueue/redis": "*@dev", "enqueue/fs": "*@dev", "enqueue/null": "*@dev", @@ -71,7 +71,7 @@ }, { "type": "path", - "url": "pkg/amqplib" + "url": "pkg/amqp-lib" }, { "type": "path", diff --git a/pkg/amqplib/AmqpConnectionFactory.php b/pkg/amqp-lib/AmqpConnectionFactory.php similarity index 80% rename from pkg/amqplib/AmqpConnectionFactory.php rename to pkg/amqp-lib/AmqpConnectionFactory.php index 5ea984f6e..a6a4590ee 100644 --- a/pkg/amqplib/AmqpConnectionFactory.php +++ b/pkg/amqp-lib/AmqpConnectionFactory.php @@ -1,6 +1,6 @@ config = array_replace($this->defaultConfig(), $config); } + /** + * @return AmqpContext + */ public function createContext() { return new AmqpContext($this->establishConnection()); } + /** + * @return AbstractConnection + */ private function establishConnection() { if (false == $this->connection) { @@ -36,6 +52,9 @@ private function establishConnection() return $this->connection; } + /** + * @return array + */ private function defaultConfig() { return [ diff --git a/pkg/amqplib/AmqpConsumer.php b/pkg/amqp-lib/AmqpConsumer.php similarity index 78% rename from pkg/amqplib/AmqpConsumer.php rename to pkg/amqp-lib/AmqpConsumer.php index f9df70d88..1d9fae0b9 100644 --- a/pkg/amqplib/AmqpConsumer.php +++ b/pkg/amqp-lib/AmqpConsumer.php @@ -1,6 +1,6 @@ channel = $channel; $this->queue = $queue; } + /** + * @return AmqpQueue + */ public function getQueue() { return $this->queue; } + /** + * @param int $timeout + * + * @return AmqpMessage|null + */ public function receive($timeout = 0) { $end = microtime(true) + ($timeout / 1000); @@ -38,6 +57,9 @@ public function receive($timeout = 0) } } + /** + * @return AmqpMessage|null + */ public function receiveNoWait() { if ($message = $this->channel->basic_get($this->queue->getQueueName())) { @@ -45,6 +67,9 @@ public function receiveNoWait() } } + /** + * @param AmqpMessage $message + */ public function acknowledge(PsrMessage $message) { InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class); @@ -52,6 +77,10 @@ public function acknowledge(PsrMessage $message) $this->channel->basic_ack($message->getDeliveryTag()); } + /** + * @param AmqpMessage $message + * @param bool $requeue + */ public function reject(PsrMessage $message, $requeue = false) { InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class); @@ -59,6 +88,11 @@ public function reject(PsrMessage $message, $requeue = false) $this->channel->basic_reject($message->getDeliveryTag(), $requeue); } + /** + * @param LibAMQPMessage $amqpMessage + * + * @return AmqpMessage + */ private function convertMessage(LibAMQPMessage $amqpMessage) { $headers = new AMQPTable($amqpMessage->get_properties()); diff --git a/pkg/amqplib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php similarity index 83% rename from pkg/amqplib/AmqpContext.php rename to pkg/amqp-lib/AmqpContext.php index 4e7704746..375d574fa 100644 --- a/pkg/amqplib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -1,39 +1,72 @@ connection = $connection; } + /** + * @param string|null $body + * @param array $properties + * @param array $headers + * + * @return AmqpMessage + */ public function createMessage($body = null, array $properties = [], array $headers = []) { return new AmqpMessage($body, $properties, $headers); } + /** + * @param string $name + * + * @return AmqpQueue + */ public function createQueue($name) { return new AmqpQueue($name); } + /** + * @param string $name + * + * @return AmqpTopic + */ public function createTopic($name) { return new AmqpTopic($name); } + /** + * @param PsrDestination $destination + * + * @return AmqpConsumer + */ public function createConsumer(PsrDestination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class); @@ -41,11 +74,17 @@ public function createConsumer(PsrDestination $destination) return new AmqpConsumer($this->getChannel(), $destination); } + /** + * @return AmqpProducer + */ public function createProducer() { return new AmqpProducer($this->getChannel()); } + /** + * @return AmqpQueue + */ public function createTemporaryQueue() { $queue = $this->createQueue(null); @@ -56,6 +95,9 @@ public function createTemporaryQueue() return $queue; } + /** + * @param AmqpTopic $destination + */ public function declareTopic(PsrDestination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class); @@ -73,6 +115,9 @@ public function declareTopic(PsrDestination $destination) ); } + /** + * @param AmqpQueue $destination + */ public function declareQueue(PsrDestination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class); @@ -89,6 +134,12 @@ public function declareQueue(PsrDestination $destination) ); } + /** + * @param AmqpTopic|AmqpQueue $source + * @param AmqpTopic|AmqpQueue $target + * + * @throws Exception + */ public function bind(PsrDestination $source, PsrDestination $target) { $source instanceof PsrTopic @@ -138,9 +189,6 @@ public function bind(PsrDestination $source, PsrDestination $target) } } - /** - * {@inheritdoc} - */ public function close() { if ($this->channel) { @@ -148,6 +196,9 @@ public function close() } } + /** + * @return AMQPChannel + */ private function getChannel() { if (null === $this->channel) { diff --git a/pkg/amqplib/AmqpMessage.php b/pkg/amqp-lib/AmqpMessage.php similarity index 65% rename from pkg/amqplib/AmqpMessage.php rename to pkg/amqp-lib/AmqpMessage.php index a9e13be86..13f8fcd93 100644 --- a/pkg/amqplib/AmqpMessage.php +++ b/pkg/amqp-lib/AmqpMessage.php @@ -1,20 +1,56 @@ body = $body; @@ -23,86 +59,145 @@ public function __construct($body = null, array $properties = [], array $headers $this->redelivered = false; } + /** + * @return null|string + */ public function getBody() { return $this->body; } + /** + * @param string|null $body + */ public function setBody($body) { $this->body = $body; } + /** + * @param array $properties + */ public function setProperties(array $properties) { $this->properties = $properties; } + /** + * @return array + */ public function getProperties() { return $this->properties; } + /** + * @param string $name + * @param mixed $value + */ public function setProperty($name, $value) { $this->properties[$name] = $value; } + /** + * @param string $name + * @param mixed $default + * + * @return mixed + */ public function getProperty($name, $default = null) { return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default; } + /** + * @param array $headers + */ public function setHeaders(array $headers) { $this->headers = $headers; } + /** + * @return array + */ public function getHeaders() { return $this->headers; } + /** + * @param string $name + * @param mixed $value + */ public function setHeader($name, $value) { $this->headers[$name] = $value; } + /** + * @param string $name + * @param mixed $default + * + * @return mixed + */ public function getHeader($name, $default = null) { return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default; } + /** + * @param bool $redelivered + */ public function setRedelivered($redelivered) { $this->redelivered = (bool) $redelivered; } + /** + * @return bool + */ public function isRedelivered() { return $this->redelivered; } + /** + * @param string $correlationId + */ public function setCorrelationId($correlationId) { $this->setHeader('correlation_id', $correlationId); } + /** + * @return string + */ public function getCorrelationId() { return $this->getHeader('correlation_id'); } + /** + * @param string $messageId + */ public function setMessageId($messageId) { $this->setHeader('message_id', $messageId); } + /** + * @return string + */ public function getMessageId() { return $this->getHeader('message_id'); } + /** + * @return int + */ public function getTimestamp() { $value = $this->getHeader('timestamp'); @@ -110,56 +205,89 @@ public function getTimestamp() return $value === null ? null : (int) $value; } + /** + * @param int $timestamp + */ public function setTimestamp($timestamp) { $this->setHeader('timestamp', $timestamp); } + /** + * @param string|null $replyTo + */ public function setReplyTo($replyTo) { $this->setHeader('reply_to', $replyTo); } + /** + * @return string|null + */ public function getReplyTo() { return $this->getHeader('reply_to'); } + /** + * @return string + */ public function getDeliveryTag() { return $this->deliveryTag; } + /** + * @param string $deliveryTag + */ public function setDeliveryTag($deliveryTag) { $this->deliveryTag = $deliveryTag; } + /** + * @return bool + */ public function isMandatory() { return $this->mandatory; } + /** + * @param int $mandatory + */ public function setMandatory($mandatory) { $this->mandatory = $mandatory; } + /** + * @return bool + */ public function isImmediate() { return $this->immediate; } + /** + * @param bool $immediate + */ public function setImmediate($immediate) { $this->immediate = $immediate; } + /** + * @return int + */ public function getTicket() { return $this->ticket; } + /** + * @param int $ticket + */ public function setTicket($ticket) { $this->ticket = $ticket; diff --git a/pkg/amqplib/AmqpProducer.php b/pkg/amqp-lib/AmqpProducer.php similarity index 89% rename from pkg/amqplib/AmqpProducer.php rename to pkg/amqp-lib/AmqpProducer.php index 578942333..b5cf61805 100644 --- a/pkg/amqplib/AmqpProducer.php +++ b/pkg/amqp-lib/AmqpProducer.php @@ -1,6 +1,6 @@ channel = $channel; } + /** + * @param AmqpTopic|AmqpQueue $destination + * @param AmqpMessage $message + */ public function send(PsrDestination $destination, PsrMessage $message) { $destination instanceof PsrTopic diff --git a/pkg/amqplib/AmqpQueue.php b/pkg/amqp-lib/AmqpQueue.php similarity index 65% rename from pkg/amqplib/AmqpQueue.php rename to pkg/amqp-lib/AmqpQueue.php index fef8ff147..5c6551c73 100644 --- a/pkg/amqplib/AmqpQueue.php +++ b/pkg/amqp-lib/AmqpQueue.php @@ -1,21 +1,64 @@ noAck = false; } + /** + * @return string + */ public function getQueueName() { return $this->name; } + /** + * @return bool + */ public function isPassive() { return $this->passive; } + /** + * @param bool $passive + */ public function setPassive($passive) { $this->passive = (bool) $passive; } + /** + * @return bool + */ public function isDurable() { return $this->durable; } + /** + * @param bool $durable + */ public function setDurable($durable) { $this->durable = (bool) $durable; } + /** + * @return bool + */ public function isExclusive() { return $this->exclusive; } + /** + * @param bool $exclusive + */ public function setExclusive($exclusive) { $this->exclusive = (bool) $exclusive; } + /** + * @return bool + */ public function isAutoDelete() { return $this->autoDelete; } + /** + * @param bool $autoDelete + */ public function setAutoDelete($autoDelete) { $this->autoDelete = (bool) $autoDelete; } + /** + * @return bool + */ public function isNoWait() { return $this->noWait; } + /** + * @param bool $noWait + */ public function setNoWait($noWait) { $this->noWait = (bool) $noWait; } + /** + * @return array|null + */ public function getArguments() { return $this->arguments; } + /** + * @param array|null $arguments + */ public function setArguments(array $arguments = null) { $this->arguments = $arguments; } + /** + * @return int + */ public function getTicket() { return $this->ticket; } + /** + * @param int $ticket + */ public function setTicket($ticket) { $this->ticket = $ticket; } + /** + * @return string + */ public function getConsumerTag() { return $this->consumerTag; } + /** + * @param string $consumerTag + */ public function setConsumerTag($consumerTag) { $this->consumerTag = $consumerTag; } + /** + * @return bool + */ public function isNoLocal() { return $this->noLocal; } + /** + * @param bool $noLocal + */ public function setNoLocal($noLocal) { $this->noLocal = $noLocal; } + /** + * @return bool + */ public function isNoAck() { return $this->noAck; } + /** + * @param bool $noAck + */ public function setNoAck($noAck) { $this->noAck = $noAck; diff --git a/pkg/amqplib/AmqpTopic.php b/pkg/amqp-lib/AmqpTopic.php similarity index 64% rename from pkg/amqplib/AmqpTopic.php rename to pkg/amqp-lib/AmqpTopic.php index 71a056cc4..1f3ae195e 100644 --- a/pkg/amqplib/AmqpTopic.php +++ b/pkg/amqp-lib/AmqpTopic.php @@ -1,22 +1,64 @@ name = $name; @@ -27,96 +69,153 @@ public function __construct($name) $this->noWait = false; } + /** + * @return string + */ public function getTopicName() { return $this->name; } + /** + * @return string + */ public function getType() { return $this->type; } + /** + * @param string $type + */ public function setType($type) { $this->type = $type; } + /** + * @return bool + */ public function isPassive() { return $this->passive; } + /** + * @param bool $passive + */ public function setPassive($passive) { $this->passive = (bool) $passive; } + /** + * @return bool + */ public function isDurable() { return $this->durable; } + /** + * @param bool $durable + */ public function setDurable($durable) { $this->durable = (bool) $durable; } + /** + * @return bool + */ public function isAutoDelete() { return $this->autoDelete; } + /** + * @param bool $autoDelete + */ public function setAutoDelete($autoDelete) { $this->autoDelete = (bool) $autoDelete; } + /** + * @return bool + */ public function isInternal() { return $this->internal; } + /** + * @param bool $internal + */ public function setInternal($internal) { $this->internal = (bool) $internal; } + /** + * @return bool + */ public function isNoWait() { return $this->noWait; } + /** + * @param bool $noWait + */ public function setNoWait($noWait) { $this->noWait = (bool) $noWait; } + /** + * @return array|null + */ public function getArguments() { return $this->arguments; } + /** + * @param array|null $arguments + */ public function setArguments(array $arguments = null) { $this->arguments = $arguments; } + /** + * @return int + */ public function getTicket() { return $this->ticket; } + /** + * @param int $ticket + */ public function setTicket($ticket) { $this->ticket = $ticket; } + /** + * @return string + */ public function getRoutingKey() { return $this->routingKey; } + /** + * @param string $routingKey + */ public function setRoutingKey($routingKey) { $this->routingKey = $routingKey; diff --git a/pkg/amqplib/composer.json b/pkg/amqp-lib/composer.json similarity index 93% rename from pkg/amqplib/composer.json rename to pkg/amqp-lib/composer.json index e6db12bb7..e73673334 100644 --- a/pkg/amqplib/composer.json +++ b/pkg/amqp-lib/composer.json @@ -1,5 +1,5 @@ { - "name": "enqueue/amqplib", + "name": "enqueue/amqp-lib", "type": "library", "description": "Message Queue Amqp Transport", "keywords": ["messaging", "queue", "amqp"], @@ -26,7 +26,7 @@ "symfony/config": "^2.8|^3" }, "autoload": { - "psr-4": { "Enqueue\\Amqplib\\": "" }, + "psr-4": { "Enqueue\\AmqpLib\\": "" }, "exclude-from-classmap": [ "/Tests/" ] From 0a685ad8acab6959317a899011e364046bef784f Mon Sep 17 00:00:00 2001 From: Paul McLaren Date: Wed, 19 Jul 2017 10:30:16 +0300 Subject: [PATCH 3/9] add spec tests --- composer.json | 1 + pkg/amqp-lib/AmqpConnectionFactory.php | 171 +++++++++++++++++- pkg/amqp-lib/AmqpContext.php | 27 ++- pkg/amqp-lib/AmqpMessage.php | 8 +- pkg/amqp-lib/LICENSE | 20 ++ .../Tests/Spec/AmqpConnectionFactoryTest.php | 14 ++ pkg/amqp-lib/Tests/Spec/AmqpContextTest.php | 24 +++ pkg/amqp-lib/Tests/Spec/AmqpMessageTest.php | 17 ++ pkg/amqp-lib/Tests/Spec/AmqpQueueTest.php | 14 ++ .../AmqpSendToAndReceiveFromQueueTest.php | 38 ++++ .../AmqpSendToAndReceiveFromTopicTest.php | 39 ++++ ...mqpSendToAndReceiveNoWaitFromQueueTest.php | 38 ++++ ...mqpSendToAndReceiveNoWaitFromTopicTest.php | 39 ++++ ...AmqpSendToTopicAndReceiveFromQueueTest.php | 55 ++++++ ...ndToTopicAndReceiveNoWaitFromQueueTest.php | 55 ++++++ pkg/amqp-lib/Tests/Spec/AmqpTopicTest.php | 14 ++ pkg/amqp-lib/composer.json | 2 +- pkg/amqp-lib/phpunit.xml.dist | 30 +++ 18 files changed, 589 insertions(+), 17 deletions(-) create mode 100644 pkg/amqp-lib/LICENSE create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpConnectionFactoryTest.php create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpContextTest.php create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpMessageTest.php create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpQueueTest.php create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveFromQueueTest.php create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpTopicTest.php create mode 100644 pkg/amqp-lib/phpunit.xml.dist diff --git a/composer.json b/composer.json index 0f3b8791e..49c274e6d 100644 --- a/composer.json +++ b/composer.json @@ -8,6 +8,7 @@ "enqueue/stomp": "*@dev", "enqueue/amqp-ext": "*@dev", "enqueue/amqp-lib": "*@dev", + "php-amqplib/php-amqplib": "^2.7@dev", "enqueue/redis": "*@dev", "enqueue/fs": "*@dev", "enqueue/null": "*@dev", diff --git a/pkg/amqp-lib/AmqpConnectionFactory.php b/pkg/amqp-lib/AmqpConnectionFactory.php index a6a4590ee..7c1aefad0 100644 --- a/pkg/amqp-lib/AmqpConnectionFactory.php +++ b/pkg/amqp-lib/AmqpConnectionFactory.php @@ -4,6 +4,9 @@ use Interop\Queue\PsrConnectionFactory; use PhpAmqpLib\Connection\AbstractConnection; +use PhpAmqpLib\Connection\AMQPLazyConnection; +use PhpAmqpLib\Connection\AMQPLazySocketConnection; +use PhpAmqpLib\Connection\AMQPSocketConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; class AmqpConnectionFactory implements PsrConnectionFactory @@ -19,10 +22,35 @@ class AmqpConnectionFactory implements PsrConnectionFactory private $connection; /** - * @param array $config + * The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost with default credentials. + * + * [ + * 'host' => 'amqp.host The host to connect too. Note: Max 1024 characters.', + * 'port' => 'amqp.port Port on the host.', + * 'vhost' => 'amqp.vhost The virtual host on the host. Note: Max 128 characters.', + * 'user' => 'amqp.user The user name to use. Note: Max 128 characters.', + * 'pass' => 'amqp.password Password. Note: Max 128 characters.', + * 'lazy' => 'the connection will be performed as later as possible, if the option set to true', + * 'stream' => 'stream or socket connection', + * ] + * + * or + * + * amqp://user:pass@host:10000/vhost?lazy=true&socket=true + * + * @param array|string $config */ - public function __construct(array $config = []) + public function __construct($config = 'amqp://') { + if (empty($config) || 'amqp://' === $config) { + $config = []; + } elseif (is_string($config)) { + $config = $this->parseDsn($config); + } elseif (is_array($config)) { + } else { + throw new \LogicException('The config must be either an array of options, a DSN string or null'); + } + $this->config = array_replace($this->defaultConfig(), $config); } @@ -40,29 +68,152 @@ public function createContext() private function establishConnection() { if (false == $this->connection) { - $this->connection = new AMQPStreamConnection( - $this->config['host'], - $this->config['port'], - $this->config['user'], - $this->config['pass'], - $this->config['vhost'] - ); + if ($this->config['stream']) { + if ($this->config['lazy']) { + $con = new AMQPLazyConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['connection_timeout'], + $this->config['read_write_timeout'], + null, + $this->config['keepalive'], + $this->config['heartbeat'] + ); + } else { + $con = new AMQPStreamConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['connection_timeout'], + $this->config['read_write_timeout'], + null, + $this->config['keepalive'], + $this->config['heartbeat'] + ); + } + } else { + if ($this->config['lazy']) { + $con = new AMQPLazySocketConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['read_timeout'], + $this->config['keepalive'], + $this->config['write_timeout'], + $this->config['heartbeat'] + ); + } else { + $con = new AMQPSocketConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['read_timeout'], + $this->config['keepalive'], + $this->config['write_timeout'], + $this->config['heartbeat'] + ); + } + } + + $this->connection = $con; } return $this->connection; } + /** + * @param string $dsn + * + * @return array + */ + private function parseDsn($dsn) + { + $dsnConfig = parse_url($dsn); + if (false === $dsnConfig) { + throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn)); + } + + $dsnConfig = array_replace([ + 'scheme' => null, + 'host' => null, + 'port' => null, + 'user' => null, + 'pass' => null, + 'path' => null, + 'query' => null, + ], $dsnConfig); + + if ('amqp' !== $dsnConfig['scheme']) { + throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "amqp" only.', $dsnConfig['scheme'])); + } + + if ($dsnConfig['query']) { + $query = []; + parse_str($dsnConfig['query'], $query); + + $dsnConfig = array_replace($query, $dsnConfig); + } + + $dsnConfig['vhost'] = ltrim($dsnConfig['path'], '/'); + + unset($dsnConfig['scheme'], $dsnConfig['query'], $dsnConfig['fragment'], $dsnConfig['path']); + + $config = array_replace($this->defaultConfig(), $dsnConfig); + $config = array_map(function ($value) { + return urldecode($value); + }, $config); + + return $config; + } + /** * @return array */ private function defaultConfig() { return [ + 'stream' => true, + 'lazy' => true, 'host' => 'localhost', 'port' => 5672, - 'vhost' => '/', 'user' => 'guest', 'pass' => 'guest', + 'vhost' => '/', + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'read_timeout' => 3, + 'keepalive' => false, + 'write_timeout' => 3, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, ]; } } diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index 375d574fa..3376bd5db 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -6,6 +6,7 @@ use Interop\Queue\InvalidDestinationException; use Interop\Queue\PsrContext; use Interop\Queue\PsrDestination; +use Interop\Queue\PsrQueue; use Interop\Queue\PsrTopic; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; @@ -37,7 +38,7 @@ public function __construct(AbstractConnection $connection) * * @return AmqpMessage */ - public function createMessage($body = null, array $properties = [], array $headers = []) + public function createMessage($body = '', array $properties = [], array $headers = []) { return new AmqpMessage($body, $properties, $headers); } @@ -69,7 +70,17 @@ public function createTopic($name) */ public function createConsumer(PsrDestination $destination) { - InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class); + $destination instanceof PsrTopic + ? InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class) + : InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class) + ; + + if ($destination instanceof AmqpTopic) { + $queue = $this->createTemporaryQueue(); + $this->bind($destination, $queue); + + return new AmqpConsumer($this->getChannel(), $queue); + } return new AmqpConsumer($this->getChannel(), $destination); } @@ -189,6 +200,18 @@ public function bind(PsrDestination $source, PsrDestination $target) } } + /** + * Purge all messages from the given queue. + * + * @param PsrQueue $queue + */ + public function purge(PsrQueue $queue) + { + InvalidDestinationException::assertDestinationInstanceOf($queue, AmqpQueue::class); + + $this->getChannel()->queue_purge($queue->getQueueName()); + } + public function close() { if ($this->channel) { diff --git a/pkg/amqp-lib/AmqpMessage.php b/pkg/amqp-lib/AmqpMessage.php index 13f8fcd93..f391d7364 100644 --- a/pkg/amqp-lib/AmqpMessage.php +++ b/pkg/amqp-lib/AmqpMessage.php @@ -7,7 +7,7 @@ class AmqpMessage implements PsrMessage { /** - * @var string|null + * @var string */ private $body; @@ -51,7 +51,7 @@ class AmqpMessage implements PsrMessage * @param array $properties * @param array $headers */ - public function __construct($body = null, array $properties = [], array $headers = []) + public function __construct($body = '', array $properties = [], array $headers = []) { $this->body = $body; $this->properties = $properties; @@ -60,7 +60,7 @@ public function __construct($body = null, array $properties = [], array $headers } /** - * @return null|string + * @return string */ public function getBody() { @@ -68,7 +68,7 @@ public function getBody() } /** - * @param string|null $body + * @param string $body */ public function setBody($body) { diff --git a/pkg/amqp-lib/LICENSE b/pkg/amqp-lib/LICENSE new file mode 100644 index 000000000..681501120 --- /dev/null +++ b/pkg/amqp-lib/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) +Copyright (c) 2017 Paul McLaren + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/pkg/amqp-lib/Tests/Spec/AmqpConnectionFactoryTest.php b/pkg/amqp-lib/Tests/Spec/AmqpConnectionFactoryTest.php new file mode 100644 index 000000000..ebc3b8a7f --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpConnectionFactoryTest.php @@ -0,0 +1,14 @@ +createMock(AMQPChannel::class); + + $con = $this->createMock(AbstractConnection::class); + $con + ->expects($this->any()) + ->method('channel') + ->willReturn($channel) + ; + + return new AmqpContext($con); + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpMessageTest.php b/pkg/amqp-lib/Tests/Spec/AmqpMessageTest.php new file mode 100644 index 000000000..57b93cbd0 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpMessageTest.php @@ -0,0 +1,17 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + return $queue; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php new file mode 100644 index 000000000..dfce6ccdf --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php @@ -0,0 +1,39 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 000000000..b4db35c10 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,38 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + return $queue; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php new file mode 100644 index 000000000..a50fc4c67 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php @@ -0,0 +1,39 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php new file mode 100644 index 000000000..928edaa72 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php @@ -0,0 +1,55 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + $context->bind($context->createTopic($queueName), $queue); + + return $queue; + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 000000000..6b4b1906b --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,55 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + $context->bind($context->createTopic($queueName), $queue); + + return $queue; + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpTopicTest.php b/pkg/amqp-lib/Tests/Spec/AmqpTopicTest.php new file mode 100644 index 000000000..89717f01f --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpTopicTest.php @@ -0,0 +1,14 @@ +=5.6", - "php-amqplib/php-amqplib": "^2.6", + "php-amqplib/php-amqplib": "^2.7@dev", "queue-interop/queue-interop": "^0.5@dev", "psr/log": "^1" }, diff --git a/pkg/amqp-lib/phpunit.xml.dist b/pkg/amqp-lib/phpunit.xml.dist new file mode 100644 index 000000000..f6b8b173a --- /dev/null +++ b/pkg/amqp-lib/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Tests + + + + From 087578e3e591c594e6b26e0f346fcb6cbd319e33 Mon Sep 17 00:00:00 2001 From: Paul McLaren Date: Wed, 19 Jul 2017 10:38:03 +0300 Subject: [PATCH 4/9] fix codestyle --- pkg/amqp-lib/AmqpContext.php | 4 ++-- pkg/amqp-lib/Tests/Spec/AmqpContextTest.php | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index 3376bd5db..f16b965f6 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -177,7 +177,7 @@ public function bind(PsrDestination $source, PsrDestination $target) $source->getArguments(), $source->getTicket() ); - // bind queue to exchange + // bind queue to exchange } elseif ($source instanceof AmqpQueue) { $this->getChannel()->queue_bind( $source->getQueueName(), @@ -187,7 +187,7 @@ public function bind(PsrDestination $source, PsrDestination $target) $target->getArguments(), $target->getTicket() ); - // bind exchange to queue + // bind exchange to queue } else { $this->getChannel()->queue_bind( $target->getQueueName(), diff --git a/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php b/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php index f2e004eb7..5e3d8bb8e 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php @@ -1,4 +1,5 @@ Date: Wed, 19 Jul 2017 13:55:45 +0300 Subject: [PATCH 5/9] consumer basic consume impl --- pkg/amqp-lib/AmqpConnectionFactory.php | 13 ++- pkg/amqp-lib/AmqpConsumer.php | 124 +++++++++++++++++++++++-- pkg/amqp-lib/AmqpContext.php | 19 +++- pkg/amqp-lib/Buffer.php | 41 ++++++++ 4 files changed, 184 insertions(+), 13 deletions(-) create mode 100644 pkg/amqp-lib/Buffer.php diff --git a/pkg/amqp-lib/AmqpConnectionFactory.php b/pkg/amqp-lib/AmqpConnectionFactory.php index 7c1aefad0..94d5ed192 100644 --- a/pkg/amqp-lib/AmqpConnectionFactory.php +++ b/pkg/amqp-lib/AmqpConnectionFactory.php @@ -32,6 +32,7 @@ class AmqpConnectionFactory implements PsrConnectionFactory * 'pass' => 'amqp.password Password. Note: Max 128 characters.', * 'lazy' => 'the connection will be performed as later as possible, if the option set to true', * 'stream' => 'stream or socket connection', + * 'receive_method' => 'Could be either basic_get or basic_consume', * ] * * or @@ -52,6 +53,15 @@ public function __construct($config = 'amqp://') } $this->config = array_replace($this->defaultConfig(), $config); + + $supportedMethods = ['basic_get', 'basic_consume']; + if (false == in_array($this->config['receive_method'], $supportedMethods, true)) { + throw new \LogicException(sprintf( + 'Invalid "receive_method" option value "%s". It could be only "%s"', + $this->config['receive_method'], + implode('", "', $supportedMethods) + )); + } } /** @@ -59,7 +69,7 @@ public function __construct($config = 'amqp://') */ public function createContext() { - return new AmqpContext($this->establishConnection()); + return new AmqpContext($this->establishConnection(), $this->config['receive_method']); } /** @@ -214,6 +224,7 @@ private function defaultConfig() 'heartbeat' => 0, 'connection_timeout' => 3.0, 'read_write_timeout' => 3.0, + 'receive_method' => 'basic_get', ]; } } diff --git a/pkg/amqp-lib/AmqpConsumer.php b/pkg/amqp-lib/AmqpConsumer.php index 1d9fae0b9..d73f1e843 100644 --- a/pkg/amqp-lib/AmqpConsumer.php +++ b/pkg/amqp-lib/AmqpConsumer.php @@ -2,10 +2,12 @@ namespace Enqueue\AmqpLib; +use Interop\Queue\Exception; use Interop\Queue\InvalidMessageException; use Interop\Queue\PsrConsumer; use Interop\Queue\PsrMessage; use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -21,14 +23,45 @@ class AmqpConsumer implements PsrConsumer */ private $queue; + /** + * @var Buffer + */ + private $buffer; + + /** + * @var bool + */ + private $isInit; + + /** + * @var string + */ + private $receiveMethod; + + /** + * @var AmqpMessage + */ + private $receivedMessage; + + /** + * @var string + */ + private $consumerTag; + /** * @param AMQPChannel $channel * @param AmqpQueue $queue + * @param Buffer $buffer + * @param string $receiveMethod */ - public function __construct(AMQPChannel $channel, AmqpQueue $queue) + public function __construct(AMQPChannel $channel, AmqpQueue $queue, Buffer $buffer, $receiveMethod) { $this->channel = $channel; $this->queue = $queue; + $this->buffer = $buffer; + $this->receiveMethod = $receiveMethod; + + $this->isInit = false; } /** @@ -40,21 +73,21 @@ public function getQueue() } /** - * @param int $timeout + * {@inheritdoc} * * @return AmqpMessage|null */ public function receive($timeout = 0) { - $end = microtime(true) + ($timeout / 1000); - - while (0 === $timeout || microtime(true) < $end) { - if ($message = $this->receiveNoWait()) { - return $message; - } + if ('basic_get' == $this->receiveMethod) { + return $this->receiveBasicGet($timeout); + } - usleep(100000); //100ms + if ('basic_consume' == $this->receiveMethod) { + return $this->receiveBasicConsume($timeout); } + + throw new \LogicException('The "receiveMethod" is not supported'); } /** @@ -110,4 +143,77 @@ private function convertMessage(LibAMQPMessage $amqpMessage) return $message; } + + /** + * @param int $timeout + * + * @return AmqpMessage|null + */ + private function receiveBasicGet($timeout) + { + $end = microtime(true) + ($timeout / 1000); + + while (0 === $timeout || microtime(true) < $end) { + if ($message = $this->receiveNoWait()) { + return $message; + } + + usleep(100000); //100ms + } + } + + /** + * @param int $timeout + * + * @return AmqpMessage|null + */ + private function receiveBasicConsume($timeout) + { + if (false === $this->isInit) { + $callback = function (LibAMQPMessage $message) { + $receivedMessage = $this->convertMessage($message); + $consumerTag = $message->delivery_info['consumer_tag']; + + if ($this->consumerTag === $consumerTag) { + $this->receivedMessage = $receivedMessage; + } else { + // not our message, put it to buffer and continue. + $this->buffer->push($consumerTag, $receivedMessage); + } + }; + + $this->channel->basic_qos(0, 1, false); + + $consumerTag = $this->channel->basic_consume( + $this->queue->getQueueName(), + $this->queue->getConsumerTag(), + $this->queue->isNoLocal(), + $this->queue->isNoAck(), + $this->queue->isExclusive(), + $this->queue->isNoWait(), + $callback + ); + + $this->consumerTag = $consumerTag ?: $this->queue->getConsumerTag(); + + if (empty($this->consumerTag)) { + throw new Exception('Got empty consumer tag'); + } + + $this->isInit = true; + } + + if ($message = $this->buffer->pop($this->consumerTag)) { + return $message; + } + + $this->receivedMessage = null; + + try { + $this->channel->wait(null, false, $timeout); + } catch (AMQPTimeoutException $e) { + } + + return $this->receivedMessage; + } } diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index f16b965f6..168766ad5 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -23,12 +23,25 @@ class AmqpContext implements PsrContext */ private $channel; + /** + * @var string + */ + private $receiveMethod; + + /** + * @var Buffer + */ + private $buffer; + /** * @param AbstractConnection $connection + * @param string $receiveMethod */ - public function __construct(AbstractConnection $connection) + public function __construct(AbstractConnection $connection, $receiveMethod) { $this->connection = $connection; + $this->receiveMethod = $receiveMethod; + $this->buffer = new Buffer(); } /** @@ -79,10 +92,10 @@ public function createConsumer(PsrDestination $destination) $queue = $this->createTemporaryQueue(); $this->bind($destination, $queue); - return new AmqpConsumer($this->getChannel(), $queue); + return new AmqpConsumer($this->getChannel(), $queue, $this->buffer, $this->receiveMethod); } - return new AmqpConsumer($this->getChannel(), $destination); + return new AmqpConsumer($this->getChannel(), $destination, $this->buffer, $this->receiveMethod); } /** diff --git a/pkg/amqp-lib/Buffer.php b/pkg/amqp-lib/Buffer.php new file mode 100644 index 000000000..55c06f619 --- /dev/null +++ b/pkg/amqp-lib/Buffer.php @@ -0,0 +1,41 @@ + [AmqpMessage, AmqpMessage ...]] + */ + private $messages; + + public function __construct() + { + $this->messages = []; + } + + /** + * @param string $consumerTag + * @param AmqpMessage $message + */ + public function push($consumerTag, AmqpMessage $message) + { + if (false == array_key_exists($consumerTag, $this->messages)) { + $this->messages[$consumerTag] = []; + } + + $this->messages[$consumerTag][] = $message; + } + + /** + * @param string $consumerTag + * + * @return AmqpMessage|null + */ + public function pop($consumerTag) + { + if (false == empty($this->messages[$consumerTag])) { + return array_shift($this->messages[$consumerTag]); + } + } +} From 4d441064247af777ff77e82eb036b6f0eab6266e Mon Sep 17 00:00:00 2001 From: Paul McLaren Date: Thu, 20 Jul 2017 12:00:26 +0300 Subject: [PATCH 6/9] amqp-lib tests --- pkg/amqp-lib/.gitignore | 6 + pkg/amqp-lib/.travis.yml | 21 ++ pkg/amqp-lib/AmqpConnectionFactory.php | 7 +- pkg/amqp-lib/AmqpTopic.php | 1 + .../Tests/AmqpConnectionFactoryConfigTest.php | 281 ++++++++++++++++++ pkg/amqp-lib/Tests/AmqpConsumerTest.php | 184 ++++++++++++ pkg/amqp-lib/Tests/AmqpContextTest.php | 244 +++++++++++++++ pkg/amqp-lib/Tests/AmqpMessageTest.php | 55 ++++ pkg/amqp-lib/Tests/AmqpProducerTest.php | 164 ++++++++++ pkg/amqp-lib/Tests/AmqpQueueTest.php | 122 ++++++++ pkg/amqp-lib/Tests/AmqpTopicTest.php | 116 ++++++++ pkg/amqp-lib/Tests/BufferTest.php | 64 ++++ pkg/amqp-lib/Tests/Spec/AmqpContextTest.php | 2 +- 13 files changed, 1262 insertions(+), 5 deletions(-) create mode 100644 pkg/amqp-lib/.gitignore create mode 100644 pkg/amqp-lib/.travis.yml create mode 100644 pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php create mode 100644 pkg/amqp-lib/Tests/AmqpConsumerTest.php create mode 100644 pkg/amqp-lib/Tests/AmqpContextTest.php create mode 100644 pkg/amqp-lib/Tests/AmqpMessageTest.php create mode 100644 pkg/amqp-lib/Tests/AmqpProducerTest.php create mode 100644 pkg/amqp-lib/Tests/AmqpQueueTest.php create mode 100644 pkg/amqp-lib/Tests/AmqpTopicTest.php create mode 100644 pkg/amqp-lib/Tests/BufferTest.php diff --git a/pkg/amqp-lib/.gitignore b/pkg/amqp-lib/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/amqp-lib/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/amqp-lib/.travis.yml b/pkg/amqp-lib/.travis.yml new file mode 100644 index 000000000..aaa1849c3 --- /dev/null +++ b/pkg/amqp-lib/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +git: + depth: 1 + +language: php + +php: + - '5.6' + - '7.0' + +cache: + directories: + - $HOME/.composer/cache + +install: + - composer self-update + - composer install --prefer-source --ignore-platform-reqs + +script: + - vendor/bin/phpunit --exclude-group=functional diff --git a/pkg/amqp-lib/AmqpConnectionFactory.php b/pkg/amqp-lib/AmqpConnectionFactory.php index 94d5ed192..ac2e31bb1 100644 --- a/pkg/amqp-lib/AmqpConnectionFactory.php +++ b/pkg/amqp-lib/AmqpConnectionFactory.php @@ -193,12 +193,11 @@ private function parseDsn($dsn) unset($dsnConfig['scheme'], $dsnConfig['query'], $dsnConfig['fragment'], $dsnConfig['path']); - $config = array_replace($this->defaultConfig(), $dsnConfig); - $config = array_map(function ($value) { + $dsnConfig = array_map(function ($value) { return urldecode($value); - }, $config); + }, $dsnConfig); - return $config; + return $dsnConfig; } /** diff --git a/pkg/amqp-lib/AmqpTopic.php b/pkg/amqp-lib/AmqpTopic.php index 1f3ae195e..a1d029853 100644 --- a/pkg/amqp-lib/AmqpTopic.php +++ b/pkg/amqp-lib/AmqpTopic.php @@ -62,6 +62,7 @@ class AmqpTopic implements PsrTopic public function __construct($name) { $this->name = $name; + $this->type = 'direct'; $this->passive = false; $this->durable = false; $this->autoDelete = true; diff --git a/pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php b/pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php new file mode 100644 index 000000000..31a1ca0ef --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php @@ -0,0 +1,281 @@ +expectException(\LogicException::class); + $this->expectExceptionMessage('The config must be either an array of options, a DSN string or null'); + + new AmqpConnectionFactory(new \stdClass()); + } + + public function testThrowIfSchemeIsNotAmqp() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be "amqp" only.'); + + new AmqpConnectionFactory('http://example.com'); + } + + public function testThrowIfDsnCouldNotBeParsed() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Failed to parse DSN "amqp://:@/"'); + + new AmqpConnectionFactory('amqp://:@/'); + } + + public function testThrowIfReceiveMenthodIsInvalid() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Invalid "receive_method" option value "invalidMethod". It could be only "basic_get", "basic_consume"'); + + new AmqpConnectionFactory(['receive_method' => 'invalidMethod']); + } + + /** + * @dataProvider provideConfigs + * + * @param mixed $config + * @param mixed $expectedConfig + */ + public function testShouldParseConfigurationAsExpected($config, $expectedConfig) + { + $factory = new AmqpConnectionFactory($config); + + $this->assertAttributeEquals($expectedConfig, 'config', $factory); + } + + public static function provideConfigs() + { + yield [ + null, + [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + // some examples from Appendix A: Examples (https://www.rabbitmq.com/uri-spec.html) + + yield [ + 'amqp://user:pass@host:10000/vhost', + [ + 'host' => 'host', + 'port' => 10000, + 'vhost' => 'vhost', + 'user' => 'user', + 'pass' => 'pass', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + 'amqp://user%61:%61pass@ho%61st:10000/v%2fhost', + [ + 'host' => 'hoast', + 'port' => 10000, + 'vhost' => 'v/host', + 'user' => 'usera', + 'pass' => 'apass', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + 'amqp://', + [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + 'amqp://user:pass@host:10000/vhost?connection_timeout=2&lazy=', + [ + 'host' => 'host', + 'port' => 10000, + 'vhost' => 'vhost', + 'user' => 'user', + 'pass' => 'pass', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => '', + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => '2', + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + [], + [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + ['lazy' => false, 'host' => 'host'], + [ + 'host' => 'host', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => false, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + ['connection_timeout' => 123, 'read_write_timeout' => 321], + [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 123, + 'read_write_timeout' => 321, + ], + ]; + + yield [ + 'amqp://user:pass@host:10000/vhost?connection_timeout=123&read_write_timeout=321', + [ + 'host' => 'host', + 'port' => 10000, + 'vhost' => 'vhost', + 'user' => 'user', + 'pass' => 'pass', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => '123', + 'read_write_timeout' => '321', + ], + ]; + } +} diff --git a/pkg/amqp-lib/Tests/AmqpConsumerTest.php b/pkg/amqp-lib/Tests/AmqpConsumerTest.php new file mode 100644 index 000000000..a77443223 --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpConsumerTest.php @@ -0,0 +1,184 @@ +assertClassImplements(PsrConsumer::class, AmqpConsumer::class); + } + + public function testCouldBeConstructedWithContextAndQueueAndBufferAsArguments() + { + new AmqpConsumer( + $this->createChannelMock(), + new AmqpQueue('aName'), + new Buffer(), + 'basic_get' + ); + } + + public function testShouldReturnQueue() + { + $queue = new AmqpQueue('aName'); + + $consumer = new AmqpConsumer($this->createChannelMock(), $queue, new Buffer(), 'basic_get'); + + $this->assertSame($queue, $consumer->getQueue()); + } + + public function testOnAcknowledgeShouldThrowExceptionIfNotAmqpMessage() + { + $consumer = new AmqpConsumer($this->createChannelMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage('The message must be an instance of Enqueue\AmqpLib\AmqpMessage but'); + + $consumer->acknowledge(new NullMessage()); + } + + public function testOnRejectShouldThrowExceptionIfNotAmqpMessage() + { + $consumer = new AmqpConsumer($this->createChannelMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage('The message must be an instance of Enqueue\AmqpLib\AmqpMessage but'); + + $consumer->reject(new NullMessage()); + } + + public function testOnAcknowledgeShouldAcknowledgeMessage() + { + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_ack') + ->with('delivery-tag') + ; + + $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $message = new AmqpMessage(); + $message->setDeliveryTag('delivery-tag'); + + $consumer->acknowledge($message); + } + + public function testOnRejectShouldRejectMessage() + { + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_reject') + ->with('delivery-tag', $this->isTrue()) + ; + + $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $message = new AmqpMessage(); + $message->setDeliveryTag('delivery-tag'); + + $consumer->reject($message, true); + } + + public function testShouldReturnMessageOnReceiveNoWait() + { + $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); + $amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag'; + $amqpMessage->delivery_info['redelivered'] = true; + + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_get') + ->willReturn($amqpMessage) + ; + + $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $message = new AmqpMessage(); + $message->setDeliveryTag('delivery-tag'); + + $message = $consumer->receiveNoWait(); + + $this->assertInstanceOf(AmqpMessage::class, $message); + $this->assertSame('body', $message->getBody()); + $this->assertSame('delivery-tag', $message->getDeliveryTag()); + $this->assertTrue($message->isRedelivered()); + } + + public function testShouldReturnMessageOnReceiveWithReceiveMethodBasicGet() + { + $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); + $amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag'; + $amqpMessage->delivery_info['redelivered'] = true; + + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_get') + ->willReturn($amqpMessage) + ; + + $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $message = new AmqpMessage(); + $message->setDeliveryTag('delivery-tag'); + + $message = $consumer->receive(); + + $this->assertInstanceOf(AmqpMessage::class, $message); + $this->assertSame('body', $message->getBody()); + $this->assertSame('delivery-tag', $message->getDeliveryTag()); + $this->assertTrue($message->isRedelivered()); + } + + public function testShouldCallExpectedMethodsWhenReceiveWithBasicConsumeMethod() + { + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_consume') + ->willReturn('consumer-tag') + ; + $channel + ->expects($this->once()) + ->method('basic_qos') + ->with($this->identicalTo(0), $this->identicalTo(1), $this->isFalse()) + ; + $channel + ->expects($this->once()) + ->method('wait') + ; + + $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_consume'); + + $message = new AmqpMessage(); + $message->setDeliveryTag('delivery-tag'); + $consumer->receive(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AMQPChannel + */ + public function createChannelMock() + { + return $this->createMock(AMQPChannel::class); + } +} diff --git a/pkg/amqp-lib/Tests/AmqpContextTest.php b/pkg/amqp-lib/Tests/AmqpContextTest.php new file mode 100644 index 000000000..17070f072 --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpContextTest.php @@ -0,0 +1,244 @@ +createChannelMock(); + $channel + ->expects($this->once()) + ->method('exchange_declare') + ->with( + $this->identicalTo('name'), + $this->identicalTo('type'), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->identicalTo(['key' => 'value']), + $this->identicalTo(12345) + ) + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $topic = new AmqpTopic('name'); + $topic->setType('type'); + $topic->setArguments(['key' => 'value']); + $topic->setAutoDelete(true); + $topic->setDurable(true); + $topic->setInternal(true); + $topic->setNoWait(true); + $topic->setPassive(true); + $topic->setRoutingKey('routing-key'); + $topic->setTicket(12345); + + $session = new AmqpContext($connection, ''); + $session->declareTopic($topic); + } + + public function testShouldDeclareQueue() + { + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('queue_declare') + ->with( + $this->identicalTo('name'), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->identicalTo(['key' => 'value']), + $this->identicalTo(12345) + ) + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $queue = new AmqpQueue('name'); + $queue->setArguments(['key' => 'value']); + $queue->setAutoDelete(true); + $queue->setDurable(true); + $queue->setNoWait(true); + $queue->setPassive(true); + $queue->setTicket(12345); + $queue->setConsumerTag('consumer-tag'); + $queue->setExclusive(true); + $queue->setNoLocal(true); + + $session = new AmqpContext($connection, ''); + $session->declareQueue($queue); + } + + public function testDeclareBindShouldThrowExceptionIfSourceDestinationIsInvalid() + { + $context = new AmqpContext($this->createConnectionMock(), ''); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\AmqpLib\AmqpTopic but got'); + + $context->bind(new NullTopic(''), new AmqpTopic('name')); + } + + public function testDeclareBindShouldThrowExceptionIfTargetDestinationIsInvalid() + { + $context = new AmqpContext($this->createConnectionMock(), ''); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\AmqpLib\AmqpTopic but got'); + + $context->bind(new AmqpQueue('name'), new NullTopic('')); + } + + public function testDeclareBindShouldThrowExceptionWhenSourceAndTargetAreQueues() + { + $context = new AmqpContext($this->createConnectionMock(), ''); + + $this->expectException(Exception::class); + $this->expectExceptionMessage('Is not possible to bind queue to queue. It is possible to bind topic to queue or topic to topic'); + + $context->bind(new AmqpQueue('name'), new AmqpQueue('name')); + } + + public function testDeclareBindShouldBindTopicToTopic() + { + $source = new AmqpTopic('source'); + $target = new AmqpTopic('target'); + + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('exchange_bind') + ->with('target', 'source') + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $context = new AmqpContext($connection, ''); + $context->bind($source, $target); + } + + public function testDeclareBindShouldBindTopicToQueue() + { + $source = new AmqpTopic('source'); + $target = new AmqpQueue('target'); + + $channel = $this->createChannelMock(); + $channel + ->expects($this->exactly(2)) + ->method('queue_bind') + ->with('target', 'source') + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $context = new AmqpContext($connection, ''); + $context->bind($source, $target); + $context->bind($target, $source); + } + + public function testShouldCloseChannelConnection() + { + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('close') + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $context = new AmqpContext($connection, ''); + $context->createProducer(); + + $context->close(); + } + + public function testPurgeShouldThrowExceptionIfDestinationIsNotAmqpQueue() + { + $context = new AmqpContext($this->createConnectionMock(), ''); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\AmqpLib\AmqpQueue but got'); + + $context->purge(new NullQueue('')); + } + + public function testShouldPurgeQueue() + { + $queue = new AmqpQueue('queue'); + + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('queue_purge') + ->with('queue') + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $context = new AmqpContext($connection, ''); + $context->purge($queue); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AbstractConnection + */ + public function createConnectionMock() + { + return $this->createMock(AbstractConnection::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AMQPChannel + */ + public function createChannelMock() + { + return $this->createMock(AMQPChannel::class); + } +} diff --git a/pkg/amqp-lib/Tests/AmqpMessageTest.php b/pkg/amqp-lib/Tests/AmqpMessageTest.php new file mode 100644 index 000000000..7c927a2cf --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpMessageTest.php @@ -0,0 +1,55 @@ +setDeliveryTag('theDeliveryTag'); + + $this->assertSame('theDeliveryTag', $message->getDeliveryTag()); + } + + public function testShouldAllowGetPreviouslySetMandatory() + { + $topic = new AmqpMessage('aName'); + + $topic->setMandatory(false); + $this->assertFalse($topic->isMandatory()); + + $topic->setMandatory(true); + $this->assertTrue($topic->isMandatory()); + } + + public function testShouldAllowGetPreviouslySetImmediate() + { + $topic = new AmqpMessage('aName'); + + $topic->setImmediate(false); + $this->assertFalse($topic->isImmediate()); + + $topic->setImmediate(true); + $this->assertTrue($topic->isImmediate()); + } + + public function testShouldAllowGetPreviouslySetTicket() + { + $topic = new AmqpMessage('aName'); + + //guard + $this->assertSame(null, $topic->getTicket()); + + $topic->setTicket('ticket'); + + $this->assertSame('ticket', $topic->getTicket()); + } +} diff --git a/pkg/amqp-lib/Tests/AmqpProducerTest.php b/pkg/amqp-lib/Tests/AmqpProducerTest.php new file mode 100644 index 000000000..db587ab7f --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpProducerTest.php @@ -0,0 +1,164 @@ +createAmqpChannelMock()); + } + + public function testShouldImplementPsrProducerInterface() + { + $this->assertClassImplements(PsrProducer::class, AmqpProducer::class); + } + + public function testShouldThrowExceptionWhenDestinationTypeIsInvalid() + { + $producer = new AmqpProducer($this->createAmqpChannelMock()); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\AmqpLib\AmqpQueue but got'); + + $producer->send($this->createDestinationMock(), new AmqpMessage()); + } + + public function testShouldThrowExceptionWhenMessageTypeIsInvalid() + { + $producer = new AmqpProducer($this->createAmqpChannelMock()); + + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage('The message must be an instance of Enqueue\AmqpLib\AmqpMessage but it is'); + + $producer->send(new AmqpTopic('name'), $this->createMessageMock()); + } + + public function testShouldPublishMessageToTopic() + { + $amqpMessage = null; + + $channel = $this->createAmqpChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_publish') + ->with($this->isInstanceOf(LibAMQPMessage::class), 'topic', 'routing-key') + ->will($this->returnCallback(function (LibAMQPMessage $message) use (&$amqpMessage) { + $amqpMessage = $message; + })) + ; + + $topic = new AmqpTopic('topic'); + $topic->setRoutingKey('routing-key'); + + $producer = new AmqpProducer($channel); + $producer->send($topic, new AmqpMessage('body')); + + $this->assertEquals('body', $amqpMessage->getBody()); + } + + public function testShouldPublishMessageToQueue() + { + $amqpMessage = null; + + $channel = $this->createAmqpChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_publish') + ->with($this->isInstanceOf(LibAMQPMessage::class), $this->isEmpty(), 'queue') + ->will($this->returnCallback(function (LibAMQPMessage $message) use (&$amqpMessage) { + $amqpMessage = $message; + })) + ; + + $queue = new AmqpQueue('queue'); + + $producer = new AmqpProducer($channel); + $producer->send($queue, new AmqpMessage('body')); + + $this->assertEquals('body', $amqpMessage->getBody()); + } + + public function testShouldSetMessageHeaders() + { + $amqpMessage = null; + + $channel = $this->createAmqpChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_publish') + ->will($this->returnCallback(function (LibAMQPMessage $message) use (&$amqpMessage) { + $amqpMessage = $message; + })) + ; + + $producer = new AmqpProducer($channel); + $producer->send(new AmqpTopic('name'), new AmqpMessage('body', [], ['content_type' => 'text/plain'])); + + $this->assertEquals(['content_type' => 'text/plain'], $amqpMessage->get_properties()); + } + + public function testShouldSetMessageProperties() + { + $amqpMessage = null; + + $channel = $this->createAmqpChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_publish') + ->will($this->returnCallback(function (LibAMQPMessage $message) use (&$amqpMessage) { + $amqpMessage = $message; + })) + ; + + $producer = new AmqpProducer($channel); + $producer->send(new AmqpTopic('name'), new AmqpMessage('body', ['key' => 'value'])); + + $properties = $amqpMessage->get_properties(); + + $this->assertArrayHasKey('application_headers', $properties); + $this->assertInstanceOf(AMQPTable::class, $properties['application_headers']); + $this->assertEquals(['key' => 'value'], $properties['application_headers']->getNativeData()); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrMessage + */ + private function createMessageMock() + { + return $this->createMock(PsrMessage::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrDestination + */ + private function createDestinationMock() + { + return $this->createMock(PsrDestination::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AMQPChannel + */ + private function createAmqpChannelMock() + { + return $this->createMock(AMQPChannel::class); + } +} diff --git a/pkg/amqp-lib/Tests/AmqpQueueTest.php b/pkg/amqp-lib/Tests/AmqpQueueTest.php new file mode 100644 index 000000000..34aceeba7 --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpQueueTest.php @@ -0,0 +1,122 @@ +setPassive(false); + $this->assertFalse($topic->isPassive()); + + $topic->setPassive(true); + $this->assertTrue($topic->isPassive()); + } + + public function testShouldAllowGetPreviouslySetDurable() + { + $topic = new AmqpQueue('aName'); + + $topic->setDurable(false); + $this->assertFalse($topic->isDurable()); + + $topic->setDurable(true); + $this->assertTrue($topic->isDurable()); + } + + public function testShouldAllowGetPreviouslySetExclusive() + { + $topic = new AmqpQueue('aName'); + + $topic->setExclusive(false); + $this->assertFalse($topic->isExclusive()); + + $topic->setExclusive(true); + $this->assertTrue($topic->isExclusive()); + } + + public function testShouldAllowGetPreviouslySetAutoDelete() + { + $topic = new AmqpQueue('aName'); + + $topic->setAutoDelete(false); + $this->assertFalse($topic->isAutoDelete()); + + $topic->setAutoDelete(true); + $this->assertTrue($topic->isAutoDelete()); + } + + public function testShouldAllowGetPreviouslySetNoWait() + { + $topic = new AmqpQueue('aName'); + + $topic->setNoWait(false); + $this->assertFalse($topic->isNoWait()); + + $topic->setNoWait(true); + $this->assertTrue($topic->isNoWait()); + } + + public function testShouldAllowGetPreviouslySetArguments() + { + $queue = new AmqpQueue('aName'); + + $queue->setArguments(['foo' => 'fooVal', 'bar' => 'barVal']); + + $this->assertSame(['foo' => 'fooVal', 'bar' => 'barVal'], $queue->getArguments()); + } + + public function testShouldAllowGetPreviouslySetTicket() + { + $topic = new AmqpQueue('aName'); + + //guard + $this->assertSame(null, $topic->getTicket()); + + $topic->setTicket('ticket'); + + $this->assertSame('ticket', $topic->getTicket()); + } + + public function testShouldAllowGetPreviouslySetConsumerTag() + { + $topic = new AmqpQueue('aName'); + + //guard + $this->assertSame(null, $topic->getConsumerTag()); + + $topic->setConsumerTag('consumer-tag'); + + $this->assertSame('consumer-tag', $topic->getConsumerTag()); + } + + public function testShouldAllowGetPreviouslySetNoLocal() + { + $topic = new AmqpQueue('aName'); + + $topic->setNoLocal(false); + $this->assertFalse($topic->isNoLocal()); + + $topic->setNoLocal(true); + $this->assertTrue($topic->isNoLocal()); + } + + public function testShouldAllowGetPreviouslySetNoAck() + { + $topic = new AmqpQueue('aName'); + + $topic->setNoAck(false); + $this->assertFalse($topic->isNoAck()); + + $topic->setNoAck(true); + $this->assertTrue($topic->isNoAck()); + } +} diff --git a/pkg/amqp-lib/Tests/AmqpTopicTest.php b/pkg/amqp-lib/Tests/AmqpTopicTest.php new file mode 100644 index 000000000..2e4649639 --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpTopicTest.php @@ -0,0 +1,116 @@ +assertSame('direct', $topic->getType()); + } + + public function testShouldAllowGetPreviouslySetType() + { + $topic = new AmqpTopic('aName'); + + $topic->setType('fanout'); + + $this->assertSame('fanout', $topic->getType()); + } + + public function testShouldAllowGetPreviouslySetPassive() + { + $topic = new AmqpTopic('aName'); + + $topic->setPassive(false); + $this->assertFalse($topic->isPassive()); + + $topic->setPassive(true); + $this->assertTrue($topic->isPassive()); + } + + public function testShouldAllowGetPreviouslySetDurable() + { + $topic = new AmqpTopic('aName'); + + $topic->setDurable(false); + $this->assertFalse($topic->isDurable()); + + $topic->setDurable(true); + $this->assertTrue($topic->isDurable()); + } + + public function testShouldAllowGetPreviouslySetAutoDelete() + { + $topic = new AmqpTopic('aName'); + + $topic->setAutoDelete(false); + $this->assertFalse($topic->isAutoDelete()); + + $topic->setAutoDelete(true); + $this->assertTrue($topic->isAutoDelete()); + } + + public function testShouldAllowGetPreviouslySetInternal() + { + $topic = new AmqpTopic('aName'); + + $topic->setInternal(false); + $this->assertFalse($topic->isInternal()); + + $topic->setInternal(true); + $this->assertTrue($topic->isInternal()); + } + + public function testShouldAllowGetPreviouslySetNoWait() + { + $topic = new AmqpTopic('aName'); + + $topic->setNoWait(false); + $this->assertFalse($topic->isNoWait()); + + $topic->setNoWait(true); + $this->assertTrue($topic->isNoWait()); + } + + public function testShouldAllowGetPreviouslySetArguments() + { + $topic = new AmqpTopic('aName'); + + $topic->setArguments(['foo' => 'fooVal', 'bar' => 'barVal']); + + $this->assertSame(['foo' => 'fooVal', 'bar' => 'barVal'], $topic->getArguments()); + } + + public function testShouldAllowGetPreviouslySetTicket() + { + $topic = new AmqpTopic('aName'); + + //guard + $this->assertSame(null, $topic->getTicket()); + + $topic->setTicket('ticket'); + + $this->assertSame('ticket', $topic->getTicket()); + } + + public function testShouldAllowGetPreviouslySetRoutingKey() + { + $topic = new AmqpTopic('aName'); + + //guard + $this->assertSame(null, $topic->getRoutingKey()); + + $topic->setRoutingKey('theRoutingKey'); + + $this->assertSame('theRoutingKey', $topic->getRoutingKey()); + } +} diff --git a/pkg/amqp-lib/Tests/BufferTest.php b/pkg/amqp-lib/Tests/BufferTest.php new file mode 100644 index 000000000..981ff2b16 --- /dev/null +++ b/pkg/amqp-lib/Tests/BufferTest.php @@ -0,0 +1,64 @@ +assertAttributeSame([], 'messages', $buffer); + } + + public function testShouldReturnNullIfNoMessagesInBuffer() + { + $buffer = new Buffer(); + + $this->assertNull($buffer->pop('aConsumerTag')); + $this->assertNull($buffer->pop('anotherConsumerTag')); + } + + public function testShouldPushMessageToBuffer() + { + $fooMessage = new AmqpMessage(); + $barMessage = new AmqpMessage(); + $bazMessage = new AmqpMessage(); + + $buffer = new Buffer(); + + $buffer->push('aConsumerTag', $fooMessage); + $buffer->push('aConsumerTag', $barMessage); + + $buffer->push('anotherConsumerTag', $bazMessage); + + $this->assertAttributeSame([ + 'aConsumerTag' => [$fooMessage, $barMessage], + 'anotherConsumerTag' => [$bazMessage], + ], 'messages', $buffer); + } + + public function testShouldPopMessageFromBuffer() + { + $fooMessage = new AmqpMessage(); + $barMessage = new AmqpMessage(); + + $buffer = new Buffer(); + + $buffer->push('aConsumerTag', $fooMessage); + $buffer->push('aConsumerTag', $barMessage); + + $this->assertSame($fooMessage, $buffer->pop('aConsumerTag')); + $this->assertSame($barMessage, $buffer->pop('aConsumerTag')); + $this->assertNull($buffer->pop('aConsumerTag')); + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php b/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php index 5e3d8bb8e..087336d5a 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php @@ -20,6 +20,6 @@ protected function createContext() ->willReturn($channel) ; - return new AmqpContext($con); + return new AmqpContext($con, ''); } } From 9aba09de65d0256bfbf26415985a1f767e3f269d Mon Sep 17 00:00:00 2001 From: Paul McLaren Date: Thu, 20 Jul 2017 12:05:48 +0300 Subject: [PATCH 7/9] fix codestyle --- pkg/amqp-lib/Tests/AmqpProducerTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/amqp-lib/Tests/AmqpProducerTest.php b/pkg/amqp-lib/Tests/AmqpProducerTest.php index db587ab7f..cfad057f4 100644 --- a/pkg/amqp-lib/Tests/AmqpProducerTest.php +++ b/pkg/amqp-lib/Tests/AmqpProducerTest.php @@ -12,8 +12,8 @@ use Interop\Queue\PsrDestination; use Interop\Queue\PsrMessage; use Interop\Queue\PsrProducer; -use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage; use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage; use PhpAmqpLib\Wire\AMQPTable; use PHPUnit\Framework\TestCase; From 259f20407be8a7f769e032c6e0a44733d230446f Mon Sep 17 00:00:00 2001 From: Paul McLaren Date: Thu, 20 Jul 2017 13:14:23 +0300 Subject: [PATCH 8/9] separate tests for basic_get and basic_consume --- ...iveFromQueueWithBasicConsumeMethodTest.php | 59 +++++++++++++++++++ ...eceiveFromQueueWithBasicGetMethodTest.php} | 2 +- 2 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php rename pkg/amqp-lib/Tests/Spec/{AmqpSendToTopicAndReceiveFromQueueTest.php => AmqpSendToTopicAndReceiveFromQueueWithBasicGetMethodTest.php} (91%) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php new file mode 100644 index 000000000..6acfcc8a0 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -0,0 +1,59 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queueName .= '_basic_consume'; + + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + $context->bind($context->createTopic($queueName), $queue); + + return $queue; + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topicName .= '_basic_consume'; + + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicGetMethodTest.php similarity index 91% rename from pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php rename to pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicGetMethodTest.php index 928edaa72..28192181f 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicGetMethodTest.php @@ -10,7 +10,7 @@ /** * @group functional */ -class AmqpSendToTopicAndReceiveFromQueueTest extends SendToTopicAndReceiveFromQueueSpec +class AmqpSendToTopicAndReceiveFromQueueWithBasicGetMethodTest extends SendToTopicAndReceiveFromQueueSpec { /** * {@inheritdoc} From f120c5bccbe510d9ded254e9925a3eed3655c4c0 Mon Sep 17 00:00:00 2001 From: Paul McLaren Date: Thu, 20 Jul 2017 13:26:49 +0300 Subject: [PATCH 9/9] fix codestyle --- ...SendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 6acfcc8a0..8af921998 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -10,7 +10,7 @@ /** * @group functional */ -class AmqpSendToTopicAndReceiveFromQueueTest extends SendToTopicAndReceiveFromQueueSpec +class AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest extends SendToTopicAndReceiveFromQueueSpec { /** * {@inheritdoc}