From 740530693fc70f5893fa45698c6d9868a7df37e0 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 15 Aug 2017 15:10:35 +0300 Subject: [PATCH 01/11] gps --- composer.json | 5 + docker-compose.yml | 7 + phpunit.xml.dist | 4 + pkg/gps/GpsConnectionFactory.php | 56 +++++++ pkg/gps/GpsConsumer.php | 146 ++++++++++++++++++ pkg/gps/GpsContext.php | 151 ++++++++++++++++++ pkg/gps/GpsMessage.php | 257 +++++++++++++++++++++++++++++++ pkg/gps/GpsProducer.php | 99 ++++++++++++ pkg/gps/GpsQueue.php | 29 ++++ pkg/gps/GpsTopic.php | 29 ++++ pkg/gps/composer.json | 41 +++++ 11 files changed, 824 insertions(+) create mode 100644 pkg/gps/GpsConnectionFactory.php create mode 100644 pkg/gps/GpsConsumer.php create mode 100644 pkg/gps/GpsContext.php create mode 100644 pkg/gps/GpsMessage.php create mode 100644 pkg/gps/GpsProducer.php create mode 100644 pkg/gps/GpsQueue.php create mode 100644 pkg/gps/GpsTopic.php create mode 100644 pkg/gps/composer.json diff --git a/composer.json b/composer.json index 3212a3a78..acb627271 100644 --- a/composer.json +++ b/composer.json @@ -20,6 +20,7 @@ "enqueue/gearman": "*@dev", "enqueue/rdkafka": "*@dev", "kwn/php-rdkafka-stubs": "^1.0.2", + "enqueue/gps": "*@dev", "enqueue/enqueue-bundle": "*@dev", "enqueue/job-queue": "*@dev", "enqueue/simple-client": "*@dev", @@ -129,6 +130,10 @@ "type": "path", "url": "pkg/rdkafka" }, + { + "type": "path", + "url": "pkg/gps" + }, { "type": "path", "url": "pkg/simple-client" diff --git a/docker-compose.yml b/docker-compose.yml index a1684e30a..35a71db69 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,7 @@ services: - gearmand - kafka - zookeeper + - google-pubsub volumes: - './:/mqdev' environment: @@ -40,6 +41,8 @@ services: - GEARMAN_DSN=gearman://gearmand:4730 - RDKAFKA_HOST=kafka - RDKAFKA_PORT=9092 + - PUBSUB_EMULATOR_HOST=http://google-pubsub:8085 + - GCLOUD_PROJECT=mqdev rabbitmq: image: enqueue/rabbitmq:latest @@ -89,6 +92,10 @@ services: volumes: - '/var/run/docker.sock:/var/run/docker.sock' + google-pubsub: + image: 'google/cloud-sdk:latest' + entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085' + volumes: mysql-data: driver: local diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 881e59e28..b3b93f5a2 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -73,6 +73,10 @@ pkg/rdkafka/Tests + + pkg/gps/Tests + + pkg/enqueue-bundle/Tests diff --git a/pkg/gps/GpsConnectionFactory.php b/pkg/gps/GpsConnectionFactory.php new file mode 100644 index 000000000..6ac045d8f --- /dev/null +++ b/pkg/gps/GpsConnectionFactory.php @@ -0,0 +1,56 @@ +config = array_replace($this->defaultConfig(), $config); + } + + /** + * {@inheritdoc} + * + * @return GpsContext + */ + public function createContext() + { + if ($this->config['lazy']) { + return new GpsContext(function () { + return $this->establishConnection(); + }); + } + + return new GpsContext($this->establishConnection()); + } + + /** + * @return PubSubClient + */ + private function establishConnection() + { + return new PubSubClient($this->config); + } + + /** + * @return array + */ + private function defaultConfig() + { + return [ + 'lazy' => true, + ]; + } +} diff --git a/pkg/gps/GpsConsumer.php b/pkg/gps/GpsConsumer.php new file mode 100644 index 000000000..5d12bbfeb --- /dev/null +++ b/pkg/gps/GpsConsumer.php @@ -0,0 +1,146 @@ +context = $context; + $this->queue = $queue; + } + + /** + * {@inheritdoc} + */ + public function getQueue() + { + return $this->queue; + } + + /** + * {@inheritdoc} + */ + public function receive($timeout = 0) + { + if ($timeout === 0) { + while (true) { + if ($message = $this->receiveMessage($timeout)) { + return $message; + } + } + } else { + return $this->receiveMessage($timeout); + } + } + + /** + * {@inheritdoc} + */ + public function receiveNoWait() + { + $messages = $this->getSubscription()->pull([ + 'maxMessages' => 1, + 'returnImmediately' => true, + ]); + + if ($messages) { + return $this->convertMessage(current($messages)); + } + } + + /** + * {@inheritdoc} + */ + public function acknowledge(PsrMessage $message) + { + if (false == $message->getNativeMessage()) { + throw new \LogicException('Native google pub/sub message required but it is empty'); + } + + $this->getSubscription()->acknowledge($message->getNativeMessage()); + } + + /** + * {@inheritdoc} + */ + public function reject(PsrMessage $message, $requeue = false) + { + if (false == $message->getNativeMessage()) { + throw new \LogicException('Native google pub/sub message required but it is empty'); + } + + $this->getSubscription()->acknowledge($message->getNativeMessage()); + } + + /** + * @return Subscription + */ + private function getSubscription() + { + if (null === $this->subscription) { + $this->subscription = $this->context->getClient()->subscription($this->queue->getQueueName()); + } + + return $this->subscription; + } + + /** + * @param Message $message + * + * @return GpsMessage + */ + private function convertMessage(Message $message) + { + $gpsMessage = GpsMessage::jsonUnserialize($message->data()); + $gpsMessage->setNativeMessage($message); + + return $gpsMessage; + } + + /** + * @param int $timeout + * + * @return GpsMessage|null + */ + private function receiveMessage($timeout) + { + $timeout /= 1000; + + try { + $messages = $this->getSubscription()->pull([ + 'maxMessages' => 1, + 'requestTimeout' => $timeout, + ]); + + if ($messages) { + return $this->convertMessage(current($messages)); + } + } catch (ServiceException $e) {} // timeout + } +} diff --git a/pkg/gps/GpsContext.php b/pkg/gps/GpsContext.php new file mode 100644 index 000000000..15424b791 --- /dev/null +++ b/pkg/gps/GpsContext.php @@ -0,0 +1,151 @@ +options = array_replace([ + 'ackDeadlineSeconds' => 10, + ], $options); + + if ($client instanceof PubSubClient) { + $this->client = $client; + } elseif (is_callable($client)) { + $this->clientFactory = $client; + } else { + throw new \InvalidArgumentException(sprintf( + 'The $client argument must be either %s or callable that returns %s once called.', + PubSubClient::class, + PubSubClient::class + )); + } + } + + /** + * {@inheritdoc} + */ + public function createMessage($body = '', array $properties = [], array $headers = []) + { + return new GpsMessage($body, $properties, $headers); + } + + /** + * {@inheritdoc} + */ + public function createTopic($topicName) + { + return new GpsTopic($topicName); + } + + /** + * {@inheritdoc} + */ + public function createQueue($queueName) + { + return new GpsQueue($queueName); + } + + /** + * {@inheritdoc} + */ + public function createTemporaryQueue() + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function createProducer() + { + return new GpsProducer($this); + } + + public function createConsumer(PsrDestination $destination) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, GpsQueue::class); + + return new GpsConsumer($this, $destination); + } + + /** + * {@inheritdoc} + */ + public function close() + { + } + + /** + * @param GpsTopic $topic + */ + public function declareTopic(GpsTopic $topic) + { + try { + $this->getClient()->createTopic($topic->getTopicName()); + } catch (ConflictException $e) {} + } + + /** + * @param GpsTopic $topic + * @param GpsQueue $queue + */ + public function subscribe(GpsTopic $topic, GpsQueue $queue) + { + $this->declareTopic($topic); + + try { + $this->getClient()->subscribe($queue->getQueueName(), $topic->getTopicName(), [ + 'ackDeadlineSeconds' => $this->options['ackDeadlineSeconds'] + ]); + } catch (ConflictException $e) {} + } + + /** + * @return PubSubClient + */ + public function getClient() + { + if (false == $this->client) { + $client = call_user_func($this->clientFactory); + if (false == $client instanceof PubSubClient) { + throw new \LogicException(sprintf( + 'The factory must return instance of %s. It returned %s', + PubSubClient::class, + is_object($client) ? get_class($client) : gettype($client) + )); + } + + $this->client = $client; + } + + return $this->client; + } +} diff --git a/pkg/gps/GpsMessage.php b/pkg/gps/GpsMessage.php new file mode 100644 index 000000000..44f72e7bf --- /dev/null +++ b/pkg/gps/GpsMessage.php @@ -0,0 +1,257 @@ +body = $body; + $this->properties = $properties; + $this->headers = $headers; + + $this->redelivered = false; + } + + /** + * {@inheritdoc} + */ + public function getBody() + { + return $this->body; + } + + /** + * {@inheritdoc} + */ + public function setBody($body) + { + $this->body = $body; + } + + /** + * {@inheritdoc} + */ + public function setProperties(array $properties) + { + $this->properties = $properties; + } + + /** + * {@inheritdoc} + */ + public function getProperties() + { + return $this->properties; + } + + /** + * {@inheritdoc} + */ + public function setProperty($name, $value) + { + $this->properties[$name] = $value; + } + + /** + * {@inheritdoc} + */ + public function getProperty($name, $default = null) + { + return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default; + } + + /** + * {@inheritdoc} + */ + public function setHeaders(array $headers) + { + $this->headers = $headers; + } + + /** + * {@inheritdoc} + */ + public function getHeaders() + { + return $this->headers; + } + + /** + * {@inheritdoc} + */ + public function setHeader($name, $value) + { + $this->headers[$name] = $value; + } + + /** + * {@inheritdoc} + */ + public function getHeader($name, $default = null) + { + return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default; + } + + /** + * {@inheritdoc} + */ + public function setRedelivered($redelivered) + { + $this->redelivered = (bool) $redelivered; + } + + /** + * {@inheritdoc} + */ + public function isRedelivered() + { + return $this->redelivered; + } + + /** + * {@inheritdoc} + */ + public function setCorrelationId($correlationId) + { + $this->setHeader('correlation_id', $correlationId); + } + + /** + * {@inheritdoc} + */ + public function getCorrelationId() + { + return $this->getHeader('correlation_id'); + } + + /** + * {@inheritdoc} + */ + public function setMessageId($messageId) + { + $this->setHeader('message_id', $messageId); + } + + /** + * {@inheritdoc} + */ + public function getMessageId() + { + return $this->getHeader('message_id'); + } + + /** + * {@inheritdoc} + */ + public function getTimestamp() + { + $value = $this->getHeader('timestamp'); + + return $value === null ? null : (int) $value; + } + + /** + * {@inheritdoc} + */ + public function setTimestamp($timestamp) + { + $this->setHeader('timestamp', $timestamp); + } + + /** + * {@inheritdoc} + */ + public function setReplyTo($replyTo) + { + $this->setHeader('reply_to', $replyTo); + } + + /** + * {@inheritdoc} + */ + public function getReplyTo() + { + return $this->getHeader('reply_to'); + } + + /** + * {@inheritdoc} + */ + public function jsonSerialize() + { + return [ + 'body' => $this->getBody(), + 'properties' => $this->getProperties(), + 'headers' => $this->getHeaders(), + ]; + } + + /** + * @param string $json + * + * @return GpsMessage + */ + public static function jsonUnserialize($json) + { + $data = json_decode($json, true); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return new self($data['body'], $data['properties'], $data['headers']); + } + + /** + * @return Message + */ + public function getNativeMessage() + { + return $this->nativeMessage; + } + + /** + * @param Message $message + */ + public function setNativeMessage(Message $message) + { + $this->nativeMessage = $message; + } +} diff --git a/pkg/gps/GpsProducer.php b/pkg/gps/GpsProducer.php new file mode 100644 index 000000000..3ac216aad --- /dev/null +++ b/pkg/gps/GpsProducer.php @@ -0,0 +1,99 @@ +context = $context; + } + + /** + * {@inheritdoc} + */ + public function send(PsrDestination $destination, PsrMessage $message) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, GpsTopic::class); + + /** @var Topic $topic */ + $topic = $this->context->getClient()->topic($destination->getTopicName()); + $topic->publish([ + 'data' => json_encode($message), + ]); + } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + if (null === $deliveryDelay) { + return; + } + + throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + if (null === $priority) { + return; + } + + throw PriorityNotSupportedException::providerDoestNotSupportIt(); + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + if (null === $timeToLive) { + return; + } + + throw TimeToLiveNotSupportedException::providerDoestNotSupportIt(); + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + } +} diff --git a/pkg/gps/GpsQueue.php b/pkg/gps/GpsQueue.php new file mode 100644 index 000000000..c1f84b00e --- /dev/null +++ b/pkg/gps/GpsQueue.php @@ -0,0 +1,29 @@ +name = $name; + } + + /** + * {@inheritdoc} + */ + public function getQueueName() + { + return $this->name; + } +} diff --git a/pkg/gps/GpsTopic.php b/pkg/gps/GpsTopic.php new file mode 100644 index 000000000..a3617f44a --- /dev/null +++ b/pkg/gps/GpsTopic.php @@ -0,0 +1,29 @@ +name = $name; + } + + /** + * {@inheritdoc} + */ + public function getTopicName() + { + return $this->name; + } +} diff --git a/pkg/gps/composer.json b/pkg/gps/composer.json new file mode 100644 index 000000000..3ec4e3d8c --- /dev/null +++ b/pkg/gps/composer.json @@ -0,0 +1,41 @@ +{ + "name": "enqueue/gps", + "type": "library", + "description": "Message Google Cloud Pub/Sub Transport", + "keywords": ["messaging", "queue", "google", "pubsub"], + "license": "MIT", + "repositories": [ + { + "type": "vcs", + "url": "git@github.com:php-enqueue/test.git" + } + ], + "require": { + "php": ">=5.6", + "queue-interop/queue-interop": "^0.6@dev", + "google/cloud-pubsub": "^0.6.1" + }, + "require-dev": { + "phpunit/phpunit": "~5.4.0", + "enqueue/test": "^0.7@dev", + "enqueue/enqueue": "^0.7@dev", + "queue-interop/queue-spec": "^0.5@dev", + "symfony/dependency-injection": "^2.8|^3", + "symfony/config": "^2.8|^3" + }, + "autoload": { + "psr-4": { "Enqueue\\Gps\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "suggest": { + "enqueue/enqueue": "If you'd like to use advanced features like Client abstract layer or Symfony integration features" + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.7.x-dev" + } + } +} From f3b2c56000310e16bf94f5ef82d70ecb7b66433b Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 15 Aug 2017 15:47:33 +0300 Subject: [PATCH 02/11] gps --- pkg/gps/.gitignore | 6 ++++++ pkg/gps/.travis.yml | 21 +++++++++++++++++++++ pkg/gps/GpsConnectionFactory.php | 10 ++++++++++ pkg/gps/LICENSE | 20 ++++++++++++++++++++ pkg/gps/README.md | 18 ++++++++++++++++++ pkg/gps/phpunit.xml.dist | 30 ++++++++++++++++++++++++++++++ 6 files changed, 105 insertions(+) create mode 100644 pkg/gps/.gitignore create mode 100644 pkg/gps/.travis.yml create mode 100644 pkg/gps/LICENSE create mode 100644 pkg/gps/README.md create mode 100644 pkg/gps/phpunit.xml.dist diff --git a/pkg/gps/.gitignore b/pkg/gps/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/gps/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/gps/.travis.yml b/pkg/gps/.travis.yml new file mode 100644 index 000000000..b9cf57fc9 --- /dev/null +++ b/pkg/gps/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +git: + depth: 10 + +language: php + +php: + - '5.6' + - '7.0' + +cache: + directories: + - $HOME/.composer/cache + +install: + - composer self-update + - composer install --prefer-source + +script: + - vendor/bin/phpunit --exclude-group=functional diff --git a/pkg/gps/GpsConnectionFactory.php b/pkg/gps/GpsConnectionFactory.php index 6ac045d8f..ccdf783ab 100644 --- a/pkg/gps/GpsConnectionFactory.php +++ b/pkg/gps/GpsConnectionFactory.php @@ -13,6 +13,16 @@ class GpsConnectionFactory implements PsrConnectionFactory private $config; /** + * @see https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application + * @see \Google\Cloud\PubSub\PubSubClient::__construct() + * + * [ + * 'projectId' => The project ID from the Google Developer's Console. + * 'keyFilePath' => The full path to your service account credentials.json file retrieved from the Google Developers Console. + * 'retries' => Number of retries for a failed request. **Defaults to** `3`. + * 'scopes' => Scopes to be used for the request. + * ] + * * @param array $config */ public function __construct(array $config = []) diff --git a/pkg/gps/LICENSE b/pkg/gps/LICENSE new file mode 100644 index 000000000..f1e6a22fe --- /dev/null +++ b/pkg/gps/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) +Copyright (c) 2016 Kotliar Maksym + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/pkg/gps/README.md b/pkg/gps/README.md new file mode 100644 index 000000000..a68cb3b48 --- /dev/null +++ b/pkg/gps/README.md @@ -0,0 +1,18 @@ +# Google Pub/Sub Transport + +[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) +[![Build Status](https://travis-ci.org/php-enqueue/gps.png?branch=master)](https://travis-ci.org/php-enqueue/gps) +[![Total Downloads](https://poser.pugx.org/enqueue/gps/d/total.png)](https://packagist.org/packages/enqueue/gps) +[![Latest Stable Version](https://poser.pugx.org/enqueue/gps/version.png)](https://packagist.org/packages/enqueue/gps) + +This is an implementation of PSR specification. It allows you to send and consume message through Google Pub/Sub library. + +## Resources + +* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) +* [Questions](https://gitter.im/php-enqueue/Lobby) +* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) + +## License + +It is released under the [MIT License](LICENSE). \ No newline at end of file diff --git a/pkg/gps/phpunit.xml.dist b/pkg/gps/phpunit.xml.dist new file mode 100644 index 000000000..57e46d2f2 --- /dev/null +++ b/pkg/gps/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Tests + + + + From d3921fba86752db671a9c2b141c79ca4adb4827b Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 16 Aug 2017 12:44:32 +0300 Subject: [PATCH 03/11] gps --- pkg/gps/GpsConnectionFactory.php | 39 +++- pkg/gps/GpsContext.php | 3 + pkg/gps/Tests/GpsConsumerTest.php | 205 ++++++++++++++++++ pkg/gps/Tests/GpsContextTest.php | 96 ++++++++ pkg/gps/Tests/GpsMessageTest.php | 48 ++++ pkg/gps/Tests/GpsProducerTest.php | 81 +++++++ .../Tests/Spec/GpsConnectionFactoryTest.php | 14 ++ pkg/gps/Tests/Spec/GpsContextTest.php | 15 ++ pkg/gps/Tests/Spec/GpsMessageTest.php | 14 ++ pkg/gps/Tests/Spec/GpsProducerTest.php | 15 ++ pkg/gps/Tests/Spec/GpsQueueTest.php | 14 ++ pkg/gps/Tests/Spec/GpsTopicTest.php | 14 ++ 12 files changed, 556 insertions(+), 2 deletions(-) create mode 100644 pkg/gps/Tests/GpsConsumerTest.php create mode 100644 pkg/gps/Tests/GpsContextTest.php create mode 100644 pkg/gps/Tests/GpsMessageTest.php create mode 100644 pkg/gps/Tests/GpsProducerTest.php create mode 100644 pkg/gps/Tests/Spec/GpsConnectionFactoryTest.php create mode 100644 pkg/gps/Tests/Spec/GpsContextTest.php create mode 100644 pkg/gps/Tests/Spec/GpsMessageTest.php create mode 100644 pkg/gps/Tests/Spec/GpsProducerTest.php create mode 100644 pkg/gps/Tests/Spec/GpsQueueTest.php create mode 100644 pkg/gps/Tests/Spec/GpsTopicTest.php diff --git a/pkg/gps/GpsConnectionFactory.php b/pkg/gps/GpsConnectionFactory.php index ccdf783ab..6d1090dd0 100644 --- a/pkg/gps/GpsConnectionFactory.php +++ b/pkg/gps/GpsConnectionFactory.php @@ -21,12 +21,27 @@ class GpsConnectionFactory implements PsrConnectionFactory * 'keyFilePath' => The full path to your service account credentials.json file retrieved from the Google Developers Console. * 'retries' => Number of retries for a failed request. **Defaults to** `3`. * 'scopes' => Scopes to be used for the request. + * 'lazy' => 'the connection will be performed as later as possible, if the option set to true' * ] * - * @param array $config + * or + * + * gps: + * gps:?projectId=projectName + * + * @param array|string|null $config */ - public function __construct(array $config = []) + public function __construct($config = 'gps:') { + if (empty($config) || 'gps:' === $config) { + $config = []; + } elseif (is_string($config)) { + $config = $this->parseDsn($config); + } elseif (is_array($config)) { + } else { + throw new \LogicException('The config must be either an array of options, a DSN string or null'); + } + $this->config = array_replace($this->defaultConfig(), $config); } @@ -46,6 +61,26 @@ public function createContext() return new GpsContext($this->establishConnection()); } + /** + * @param string $dsn + * + * @return array + */ + private function parseDsn($dsn) + { + if (false === strpos($dsn, 'gps:')) { + throw new \LogicException(sprintf('The given DSN "%s" is not supported. Must start with "gps:".', $dsn)); + } + + $config = []; + + if ($query = parse_url($dsn, PHP_URL_QUERY)) { + parse_str($query, $config); + } + + return $config; + } + /** * @return PubSubClient */ diff --git a/pkg/gps/GpsContext.php b/pkg/gps/GpsContext.php index 15424b791..92eeaa99b 100644 --- a/pkg/gps/GpsContext.php +++ b/pkg/gps/GpsContext.php @@ -89,6 +89,9 @@ public function createProducer() return new GpsProducer($this); } + /** + * {@inheritdoc} + */ public function createConsumer(PsrDestination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, GpsQueue::class); diff --git a/pkg/gps/Tests/GpsConsumerTest.php b/pkg/gps/Tests/GpsConsumerTest.php new file mode 100644 index 000000000..24bed7e3b --- /dev/null +++ b/pkg/gps/Tests/GpsConsumerTest.php @@ -0,0 +1,205 @@ +createContextMock(), new GpsQueue('')); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Native google pub/sub message required but it is empty'); + + $consumer->acknowledge(new GpsMessage('')); + } + + public function testShouldAcknowledgeMessage() + { + $nativeMessage = new Message([], []); + + $subscription = $this->createSubscriptionMock(); + $subscription + ->expects($this->once()) + ->method('acknowledge') + ->with($this->identicalTo($nativeMessage)) + ; + + $client = $this->createPubSubClientMock(); + $client + ->expects($this->once()) + ->method('subscription') + ->willReturn($subscription) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getClient') + ->willReturn($client) + ; + + $consumer = new GpsConsumer($context, new GpsQueue('queue-name')); + + $message = new GpsMessage(''); + $message->setNativeMessage($nativeMessage); + + $consumer->acknowledge($message); + } + + public function testRejectShouldThrowExceptionIfNativeMessageNotSet() + { + $consumer = new GpsConsumer($this->createContextMock(), new GpsQueue('')); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Native google pub/sub message required but it is empty'); + + $consumer->acknowledge(new GpsMessage('')); + } + + public function testShouldRejectMessage() + { + $nativeMessage = new Message([], []); + + $subscription = $this->createSubscriptionMock(); + $subscription + ->expects($this->once()) + ->method('acknowledge') + ->with($this->identicalTo($nativeMessage)) + ; + + $client = $this->createPubSubClientMock(); + $client + ->expects($this->once()) + ->method('subscription') + ->willReturn($subscription) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getClient') + ->willReturn($client) + ; + + $consumer = new GpsConsumer($context, new GpsQueue('queue-name')); + + $message = new GpsMessage(''); + $message->setNativeMessage($nativeMessage); + + $consumer->reject($message); + } + + public function testShouldReceiveMessageNoWait() + { + $message = new GpsMessage('the body'); + $nativeMessage = new Message([ + 'data' => json_encode($message), + ], []); + + $subscription = $this->createSubscriptionMock(); + $subscription + ->expects($this->once()) + ->method('pull') + ->with($this->identicalTo([ + 'maxMessages' => 1, + 'returnImmediately' => true, + ])) + ->willReturn([$nativeMessage]) + ; + + $client = $this->createPubSubClientMock(); + $client + ->expects($this->once()) + ->method('subscription') + ->willReturn($subscription) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getClient') + ->willReturn($client) + ; + + $consumer = new GpsConsumer($context, new GpsQueue('queue-name')); + + $message = $consumer->receiveNoWait(); + + $this->assertInstanceOf(GpsMessage::class, $message); + $this->assertSame('the body', $message->getBody()); + } + + public function testShouldReceiveMessage() + { + $message = new GpsMessage('the body'); + $nativeMessage = new Message([ + 'data' => json_encode($message), + ], []); + + $subscription = $this->createSubscriptionMock(); + $subscription + ->expects($this->once()) + ->method('pull') + ->with($this->identicalTo([ + 'maxMessages' => 1, + 'requestTimeout' => 12.345, + ])) + ->willReturn([$nativeMessage]) + ; + + $client = $this->createPubSubClientMock(); + $client + ->expects($this->once()) + ->method('subscription') + ->willReturn($subscription) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getClient') + ->willReturn($client) + ; + + $consumer = new GpsConsumer($context, new GpsQueue('queue-name')); + + $message = $consumer->receive(12345); + + $this->assertInstanceOf(GpsMessage::class, $message); + $this->assertSame('the body', $message->getBody()); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|GpsContext + */ + private function createContextMock() + { + return $this->createMock(GpsContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PubSubClient + */ + private function createPubSubClientMock() + { + return $this->createMock(PubSubClient::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|Subscription + */ + private function createSubscriptionMock() + { + return $this->createMock(Subscription::class); + } +} diff --git a/pkg/gps/Tests/GpsContextTest.php b/pkg/gps/Tests/GpsContextTest.php new file mode 100644 index 000000000..0ca5b437b --- /dev/null +++ b/pkg/gps/Tests/GpsContextTest.php @@ -0,0 +1,96 @@ +createPubSubClientMock(); + $client + ->expects($this->once()) + ->method('createTopic') + ->with('topic-name') + ; + + $topic = new GpsTopic('topic-name'); + + $context = new GpsContext($client); + $context->declareTopic($topic); + } + + public function testDeclareTopicShouldCatchConflictException() + { + $client = $this->createPubSubClientMock(); + $client + ->expects($this->once()) + ->method('createTopic') + ->willThrowException(new ConflictException('')) + ; + + $topic = new GpsTopic(''); + + $context = new GpsContext($client); + $context->declareTopic($topic); + } + + public function testShouldSubscribeTopicToQueue() + { + $client = $this->createPubSubClientMock(); + $client + ->expects($this->once()) + ->method('subscribe') + ->with('queue-name', 'topic-name', $this->identicalTo(['ackDeadlineSeconds' => 10])) + ; + + $topic = new GpsTopic('topic-name'); + $queue = new GpsQueue('queue-name'); + + $context = new GpsContext($client); + + $context->subscribe($topic, $queue); + } + + public function testSubscribeShouldCatchConflictException() + { + $client = $this->createPubSubClientMock(); + $client + ->expects($this->once()) + ->method('subscribe') + ->willThrowException(new ConflictException('')) + ; + + $topic = new GpsTopic('topic-name'); + $queue = new GpsQueue('queue-name'); + + $context = new GpsContext($client); + + $context->subscribe($topic, $queue); + } + + public function testCreateConsumerShouldThrowExceptionIfInvalidDestination() + { + $context = new GpsContext($this->createPubSubClientMock()); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\Gps\GpsQueue but got Enqueue\Gps\GpsTopic'); + + $context->createConsumer(new GpsTopic('')); + } + + /** + * @return PubSubClient|\PHPUnit_Framework_MockObject_MockObject|PubSubClient + */ + private function createPubSubClientMock() + { + return $this->createMock(PubSubClient::class); + } +} diff --git a/pkg/gps/Tests/GpsMessageTest.php b/pkg/gps/Tests/GpsMessageTest.php new file mode 100644 index 000000000..d84cd99d9 --- /dev/null +++ b/pkg/gps/Tests/GpsMessageTest.php @@ -0,0 +1,48 @@ +setNativeMessage($nativeMessage = new Message([], [])); + + $this->assertSame($nativeMessage, $message->getNativeMessage()); + } + + public function testColdBeSerializedToJson() + { + $message = new GpsMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']); + + $this->assertEquals('{"body":"theBody","properties":{"thePropFoo":"thePropFooVal"},"headers":{"theHeaderFoo":"theHeaderFooVal"}}', json_encode($message)); + } + + public function testCouldBeUnserializedFromJson() + { + $message = new GpsMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']); + + $json = json_encode($message); + + //guard + $this->assertNotEmpty($json); + + $unserializedMessage = GpsMessage::jsonUnserialize($json); + + $this->assertInstanceOf(GpsMessage::class, $unserializedMessage); + $this->assertEquals($message, $unserializedMessage); + } + + public function testThrowIfMalformedJsonGivenOnUnsterilizedFromJson() + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('The malformed json given.'); + + GpsMessage::jsonUnserialize('{]'); + } +} diff --git a/pkg/gps/Tests/GpsProducerTest.php b/pkg/gps/Tests/GpsProducerTest.php new file mode 100644 index 000000000..117b93298 --- /dev/null +++ b/pkg/gps/Tests/GpsProducerTest.php @@ -0,0 +1,81 @@ +createContextMock()); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\Gps\GpsTopic but got Enqueue\Gps\GpsQueue'); + + $producer->send(new GpsQueue(''), new GpsMessage('')); + } + + public function testShouldSendMessage() + { + $topic = new GpsTopic('topic-name'); + $message = new GpsMessage(''); + + $gtopic = $this->createGTopicMock(); + $gtopic + ->expects($this->once()) + ->method('publish') + ->with($this->identicalTo(['data' => '{"body":"","properties":[],"headers":[]}'])) + ; + + $client = $this->createPubSubClientMock(); + $client + ->expects($this->once()) + ->method('topic') + ->with('topic-name') + ->willReturn($gtopic) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getClient') + ->willReturn($client) + ; + + $producer = new GpsProducer($context); + $producer->send($topic, $message); + } + + /** + * @return GpsContext|\PHPUnit_Framework_MockObject_MockObject|GpsContext + */ + private function createContextMock() + { + return $this->createMock(GpsContext::class); + } + + /** + * @return PubSubClient|\PHPUnit_Framework_MockObject_MockObject|PubSubClient + */ + private function createPubSubClientMock() + { + return $this->createMock(PubSubClient::class); + } + + /** + * @return Topic|\PHPUnit_Framework_MockObject_MockObject|Topic + */ + private function createGTopicMock() + { + return $this->createMock(Topic::class); + } +} diff --git a/pkg/gps/Tests/Spec/GpsConnectionFactoryTest.php b/pkg/gps/Tests/Spec/GpsConnectionFactoryTest.php new file mode 100644 index 000000000..5a12d7a02 --- /dev/null +++ b/pkg/gps/Tests/Spec/GpsConnectionFactoryTest.php @@ -0,0 +1,14 @@ +createMock(PubSubClient::class)); + } +} diff --git a/pkg/gps/Tests/Spec/GpsMessageTest.php b/pkg/gps/Tests/Spec/GpsMessageTest.php new file mode 100644 index 000000000..107240923 --- /dev/null +++ b/pkg/gps/Tests/Spec/GpsMessageTest.php @@ -0,0 +1,14 @@ +createMock(GpsContext::class)); + } +} diff --git a/pkg/gps/Tests/Spec/GpsQueueTest.php b/pkg/gps/Tests/Spec/GpsQueueTest.php new file mode 100644 index 000000000..c2eccb4fe --- /dev/null +++ b/pkg/gps/Tests/Spec/GpsQueueTest.php @@ -0,0 +1,14 @@ + Date: Wed, 16 Aug 2017 12:45:18 +0300 Subject: [PATCH 04/11] gps --- pkg/gps/GpsConsumer.php | 3 ++- pkg/gps/GpsContext.php | 8 +++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/gps/GpsConsumer.php b/pkg/gps/GpsConsumer.php index 5d12bbfeb..c7e36e9b0 100644 --- a/pkg/gps/GpsConsumer.php +++ b/pkg/gps/GpsConsumer.php @@ -141,6 +141,7 @@ private function receiveMessage($timeout) if ($messages) { return $this->convertMessage(current($messages)); } - } catch (ServiceException $e) {} // timeout + } catch (ServiceException $e) { + } // timeout } } diff --git a/pkg/gps/GpsContext.php b/pkg/gps/GpsContext.php index 92eeaa99b..8c98fdbdf 100644 --- a/pkg/gps/GpsContext.php +++ b/pkg/gps/GpsContext.php @@ -113,7 +113,8 @@ public function declareTopic(GpsTopic $topic) { try { $this->getClient()->createTopic($topic->getTopicName()); - } catch (ConflictException $e) {} + } catch (ConflictException $e) { + } } /** @@ -126,9 +127,10 @@ public function subscribe(GpsTopic $topic, GpsQueue $queue) try { $this->getClient()->subscribe($queue->getQueueName(), $topic->getTopicName(), [ - 'ackDeadlineSeconds' => $this->options['ackDeadlineSeconds'] + 'ackDeadlineSeconds' => $this->options['ackDeadlineSeconds'], ]); - } catch (ConflictException $e) {} + } catch (ConflictException $e) { + } } /** From fd477dd04525c78a06433cb8b39fee5d7d0737ac Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 16 Aug 2017 14:29:25 +0300 Subject: [PATCH 05/11] gps --- .../GpsSendToTopicAndReceiveFromQueueTest.php | 35 +++++++++++++++++++ ...ndToTopicAndReceiveNoWaitFromQueueTest.php | 35 +++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php create mode 100644 pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php diff --git a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php new file mode 100644 index 000000000..d665c7fd7 --- /dev/null +++ b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php @@ -0,0 +1,35 @@ +createContext(); + } + + /** + * @param GpsContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + + $context->subscribe($this->topic, $queue); + + return $queue; + } + + protected function createTopic(PsrContext $context, $topicName) + { + return $this->topic = parent::createTopic($context, $topicName); + } +} diff --git a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 000000000..4966cb832 --- /dev/null +++ b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,35 @@ +createContext(); + } + + /** + * @param GpsContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + + $context->subscribe($this->topic, $queue); + + return $queue; + } + + protected function createTopic(PsrContext $context, $topicName) + { + return $this->topic = parent::createTopic($context, $topicName); + } +} From 4a16037af4ad53b738083b2f1d3e0ad012f6a003 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 16 Aug 2017 14:32:31 +0300 Subject: [PATCH 06/11] gps --- pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php | 3 +++ .../Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php index d665c7fd7..c10317b9f 100644 --- a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php +++ b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php @@ -7,6 +7,9 @@ use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendToTopicAndReceiveNoWaitFromQueueSpec; +/** + * @group functional + */ class GpsSendToTopicAndReceiveFromQueueTest extends SendToTopicAndReceiveNoWaitFromQueueSpec { private $topic; diff --git a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php index 4966cb832..8a904334f 100644 --- a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php +++ b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php @@ -7,6 +7,9 @@ use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendToTopicAndReceiveNoWaitFromQueueSpec; +/** + * @group functional + */ class GpsSendToTopicAndReceiveNoWaitFromQueueTest extends SendToTopicAndReceiveNoWaitFromQueueSpec { private $topic; From b93d1a7ce483ae737b00f8f7eca3e24b1d11d26b Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 16 Aug 2017 14:35:24 +0300 Subject: [PATCH 07/11] gps --- pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php | 1 + .../Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php | 1 + 2 files changed, 2 insertions(+) diff --git a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php index c10317b9f..b702c5dea 100644 --- a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php +++ b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveFromQueueTest.php @@ -21,6 +21,7 @@ protected function createContext() /** * @param GpsContext $context + * @param mixed $queueName */ protected function createQueue(PsrContext $context, $queueName) { diff --git a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php index 8a904334f..08c42a79b 100644 --- a/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php +++ b/pkg/gps/Tests/Spec/GpsSendToTopicAndReceiveNoWaitFromQueueTest.php @@ -21,6 +21,7 @@ protected function createContext() /** * @param GpsContext $context + * @param mixed $queueName */ protected function createQueue(PsrContext $context, $queueName) { From 2600fe4689bc51ffddfb104652ecb5e9f2031cef Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 16 Aug 2017 17:13:02 +0300 Subject: [PATCH 08/11] gps --- pkg/gps/Client/GpsDriver.php | 182 +++++++++ pkg/gps/Symfony/GpsTransportFactory.php | 130 ++++++ pkg/gps/Tests/Client/GpsDriverTest.php | 384 ++++++++++++++++++ .../Tests/Symfony/GpsTransportFactoryTest.php | 157 +++++++ 4 files changed, 853 insertions(+) create mode 100644 pkg/gps/Client/GpsDriver.php create mode 100644 pkg/gps/Symfony/GpsTransportFactory.php create mode 100644 pkg/gps/Tests/Client/GpsDriverTest.php create mode 100644 pkg/gps/Tests/Symfony/GpsTransportFactoryTest.php diff --git a/pkg/gps/Client/GpsDriver.php b/pkg/gps/Client/GpsDriver.php new file mode 100644 index 000000000..db3c0c7f6 --- /dev/null +++ b/pkg/gps/Client/GpsDriver.php @@ -0,0 +1,182 @@ +context = $context; + $this->config = $config; + $this->queueMetaRegistry = $queueMetaRegistry; + } + + /** + * {@inheritdoc} + */ + public function sendToRouter(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { + throw new \LogicException('Topic name parameter is required but is not set'); + } + + $topic = $this->createRouterTopic(); + $transportMessage = $this->createTransportMessage($message); + + $this->context->createProducer()->send($topic, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function sendToProcessor(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { + throw new \LogicException('Processor name parameter is required but is not set'); + } + + if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { + throw new \LogicException('Queue name parameter is required but is not set'); + } + + $transportMessage = $this->createTransportMessage($message); + $destination = $this->context->createTopic( + $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName()) + ; + + $this->context->createProducer()->send($destination, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function setupBroker(LoggerInterface $logger = null) + { + $logger = $logger ?: new NullLogger(); + $log = function ($text, ...$args) use ($logger) { + $logger->debug(sprintf('[GpsDriver] '.$text, ...$args)); + }; + + // setup router + $routerTopic = $this->createRouterTopic(); + $routerQueue = $this->createQueue($this->config->getRouterQueueName()); + + $log('Subscribe router topic to queue: %s -> %s', $routerTopic->getTopicName(), $routerQueue->getQueueName()); + $this->context->subscribe($routerTopic, $routerQueue); + + // setup queues + foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) { + $topic = $this->context->createTopic($meta->getTransportName()); + $queue = $this->context->createQueue($meta->getTransportName()); + + $log('Subscribe processor topic to queue: %s -> %s', $topic->getTopicName(), $queue->getQueueName()); + $this->context->subscribe($topic, $queue); + } + } + + /** + * {@inheritdoc} + * + * @return GpsQueue + */ + public function createQueue($queueName) + { + $transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName(); + + return $this->context->createQueue($transportName); + } + + /** + * {@inheritdoc} + * + * @return GpsMessage + */ + public function createTransportMessage(Message $message) + { + $headers = $message->getHeaders(); + $properties = $message->getProperties(); + + $transportMessage = $this->context->createMessage(); + $transportMessage->setBody($message->getBody()); + $transportMessage->setHeaders($headers); + $transportMessage->setProperties($properties); + $transportMessage->setMessageId($message->getMessageId()); + $transportMessage->setTimestamp($message->getTimestamp()); + $transportMessage->setReplyTo($message->getReplyTo()); + $transportMessage->setCorrelationId($message->getCorrelationId()); + + return $transportMessage; + } + + /** + * @param GpsMessage $message + * + * {@inheritdoc} + */ + public function createClientMessage(PsrMessage $message) + { + $clientMessage = new Message(); + + $clientMessage->setBody($message->getBody()); + $clientMessage->setHeaders($message->getHeaders()); + $clientMessage->setProperties($message->getProperties()); + $clientMessage->setMessageId($message->getMessageId()); + $clientMessage->setTimestamp($message->getTimestamp()); + $clientMessage->setReplyTo($message->getReplyTo()); + $clientMessage->setCorrelationId($message->getCorrelationId()); + + return $clientMessage; + } + + /** + * @return Config + */ + public function getConfig() + { + return $this->config; + } + + /** + * @return GpsTopic + */ + private function createRouterTopic() + { + $topic = $this->context->createTopic( + $this->config->createTransportRouterTopicName($this->config->getRouterTopicName()) + ); + + return $topic; + } +} diff --git a/pkg/gps/Symfony/GpsTransportFactory.php b/pkg/gps/Symfony/GpsTransportFactory.php new file mode 100644 index 000000000..bec183c25 --- /dev/null +++ b/pkg/gps/Symfony/GpsTransportFactory.php @@ -0,0 +1,130 @@ +name = $name; + } + + /** + * {@inheritdoc} + */ + public function addConfiguration(ArrayNodeDefinition $builder) + { + $builder + ->beforeNormalization() + ->ifString() + ->then(function ($v) { + return ['dsn' => $v]; + }) + ->end() + ->children() + ->scalarNode('dsn') + ->info('The connection to AMQP broker set as a string. Other parameters are ignored if set') + ->end() + ->scalarNode('projectId') + ->info('The project ID from the Google Developer\'s Console.') + ->end() + ->scalarNode('keyFilePath') + ->info('The full path to your service account credentials.json file retrieved from the Google Developers Console.') + ->end() + ->integerNode('retries') + ->defaultValue(3) + ->info('Number of retries for a failed request.') + ->end() + ->arrayNode('scopes') + ->prototype('scalar')->end() + ->info('Scopes to be used for the request.') + ->end() + ->booleanNode('lazy') + ->defaultTrue() + ->info('The connection will be performed as later as possible, if the option set to true') + ->end() + ; + } + + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + foreach ($config as $key => $value) { + if (null === $value) { + unset($config[$key]); + } elseif (is_array($value) && empty($value)) { + unset($config[$key]); + } + } + + $factory = new Definition(GpsConnectionFactory::class); + $factory->setArguments(isset($config['dsn']) ? [$config['dsn']] : [$config]); + + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $container->setDefinition($factoryId, $factory); + + return $factoryId; + } + + /** + * {@inheritdoc} + */ + public function createContext(ContainerBuilder $container, array $config) + { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + + $context = new Definition(GpsContext::class); + $context->setFactory([new Reference($factoryId), 'createContext']); + + $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); + $container->setDefinition($contextId, $context); + + return $contextId; + } + + /** + * {@inheritdoc} + */ + public function createDriver(ContainerBuilder $container, array $config) + { + $driver = new Definition(GpsDriver::class); + $driver->setArguments([ + new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), + new Reference('enqueue.client.config'), + new Reference('enqueue.client.meta.queue_meta_registry'), + ]); + + $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); + $container->setDefinition($driverId, $driver); + + return $driverId; + } + + /** + * {@inheritdoc} + */ + public function getName() + { + return $this->name; + } +} diff --git a/pkg/gps/Tests/Client/GpsDriverTest.php b/pkg/gps/Tests/Client/GpsDriverTest.php new file mode 100644 index 000000000..da879cc26 --- /dev/null +++ b/pkg/gps/Tests/Client/GpsDriverTest.php @@ -0,0 +1,384 @@ +assertClassImplements(DriverInterface::class, GpsDriver::class); + } + + public function testCouldBeConstructedWithRequiredArguments() + { + new GpsDriver( + $this->createGpsContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + } + + public function testShouldReturnConfigObject() + { + $config = $this->createDummyConfig(); + + $driver = new GpsDriver($this->createGpsContextMock(), $config, $this->createDummyQueueMetaRegistry()); + + $this->assertSame($config, $driver->getConfig()); + } + + public function testShouldCreateAndReturnQueueInstance() + { + $expectedQueue = new GpsQueue('aName'); + + $context = $this->createGpsContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('aprefix.afooqueue') + ->willReturn($expectedQueue) + ; + + $driver = new GpsDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); + + $queue = $driver->createQueue('aFooQueue'); + + $this->assertSame($expectedQueue, $queue); + } + + public function testShouldCreateAndReturnQueueInstanceWithHardcodedTransportName() + { + $expectedQueue = new GpsQueue('aName'); + + $context = $this->createGpsContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('aBarQueue') + ->willReturn($expectedQueue) + ; + + $driver = new GpsDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); + + $queue = $driver->createQueue('aBarQueue'); + + $this->assertSame($expectedQueue, $queue); + } + + public function testShouldConvertTransportMessageToClientMessage() + { + $transportMessage = new GpsMessage(); + $transportMessage->setBody('body'); + $transportMessage->setHeaders(['hkey' => 'hval']); + $transportMessage->setProperties(['key' => 'val']); + $transportMessage->setMessageId('MessageId'); + $transportMessage->setTimestamp(1000); + $transportMessage->setReplyTo('theReplyTo'); + $transportMessage->setCorrelationId('theCorrelationId'); + + $driver = new GpsDriver( + $this->createGpsContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $clientMessage = $driver->createClientMessage($transportMessage); + + $this->assertInstanceOf(Message::class, $clientMessage); + $this->assertSame('body', $clientMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + 'reply_to' => 'theReplyTo', + 'correlation_id' => 'theCorrelationId', + ], $clientMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $clientMessage->getProperties()); + $this->assertSame('MessageId', $clientMessage->getMessageId()); + $this->assertNull($clientMessage->getExpire()); + $this->assertNull($clientMessage->getContentType()); + $this->assertSame(1000, $clientMessage->getTimestamp()); + $this->assertSame('theReplyTo', $clientMessage->getReplyTo()); + $this->assertSame('theCorrelationId', $clientMessage->getCorrelationId()); + } + + public function testShouldConvertClientMessageToTransportMessage() + { + $clientMessage = new Message(); + $clientMessage->setBody('body'); + $clientMessage->setHeaders(['hkey' => 'hval']); + $clientMessage->setProperties(['key' => 'val']); + $clientMessage->setContentType('ContentType'); + $clientMessage->setExpire(123); + $clientMessage->setMessageId('MessageId'); + $clientMessage->setTimestamp(1000); + $clientMessage->setReplyTo('theReplyTo'); + $clientMessage->setCorrelationId('theCorrelationId'); + + $context = $this->createGpsContextMock(); + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn(new GpsMessage()) + ; + + $driver = new GpsDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $transportMessage = $driver->createTransportMessage($clientMessage); + + $this->assertInstanceOf(GpsMessage::class, $transportMessage); + $this->assertSame('body', $transportMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + 'reply_to' => 'theReplyTo', + 'correlation_id' => 'theCorrelationId', + ], $transportMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $transportMessage->getProperties()); + $this->assertSame('MessageId', $transportMessage->getMessageId()); + $this->assertSame(1000, $transportMessage->getTimestamp()); + $this->assertSame('theReplyTo', $transportMessage->getReplyTo()); + $this->assertSame('theCorrelationId', $transportMessage->getCorrelationId()); + } + + public function testShouldSendMessageToRouter() + { + $topic = new GpsTopic(''); + $transportMessage = new GpsMessage(); + + $producer = $this->createGpsProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($topic), $this->identicalTo($transportMessage)) + ; + $context = $this->createGpsContextMock(); + $context + ->expects($this->once()) + ->method('createTopic') + ->willReturn($topic) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new GpsDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic'); + + $driver->sendToRouter($message); + } + + public function testShouldThrowExceptionIfTopicParameterIsNotSet() + { + $driver = new GpsDriver( + $this->createGpsContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Topic name parameter is required but is not set'); + + $driver->sendToRouter(new Message()); + } + + public function testShouldSendMessageToProcessor() + { + $topic = new GpsTopic(''); + $transportMessage = new GpsMessage(); + + $producer = $this->createGpsProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($topic), $this->identicalTo($transportMessage)) + ; + $context = $this->createGpsContextMock(); + $context + ->expects($this->once()) + ->method('createTopic') + ->willReturn($topic) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new GpsDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aFooQueue'); + + $driver->sendToProcessor($message); + } + + public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet() + { + $driver = new GpsDriver( + $this->createGpsContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Processor name parameter is required but is not set'); + + $driver->sendToProcessor(new Message()); + } + + public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet() + { + $driver = new GpsDriver( + $this->createGpsContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Queue name parameter is required but is not set'); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + + $driver->sendToProcessor($message); + } + + public function testShouldSetupBroker() + { + $routerTopic = new GpsTopic(''); + $routerQueue = new GpsQueue(''); + + $processorTopic = new GpsTopic(''); + $processorQueue = new GpsQueue(''); + + $context = $this->createGpsContextMock(); + // setup router + $context + ->expects($this->at(0)) + ->method('createTopic') + ->willReturn($routerTopic) + ; + $context + ->expects($this->at(1)) + ->method('createQueue') + ->willReturn($routerQueue) + ; + $context + ->expects($this->at(2)) + ->method('subscribe') + ->with($this->identicalTo($routerTopic), $this->identicalTo($routerQueue)) + ; + // setup processor queue + $context + ->expects($this->at(3)) + ->method('createTopic') + ->willReturn($processorTopic) + ; + $context + ->expects($this->at(4)) + ->method('createQueue') + ->willReturn($processorQueue) + ; + $context + ->expects($this->at(5)) + ->method('subscribe') + ->with($this->identicalTo($processorTopic), $this->identicalTo($processorQueue)) + ; + + $meta = new QueueMetaRegistry($this->createDummyConfig(), [ + 'default' => [], + ]); + + $driver = new GpsDriver( + $context, + $this->createDummyConfig(), + $meta + ); + + $driver->setupBroker(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|GpsContext + */ + private function createGpsContextMock() + { + return $this->createMock(GpsContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|GpsProducer + */ + private function createGpsProducerMock() + { + return $this->createMock(GpsProducer::class); + } + + /** + * @return QueueMetaRegistry + */ + private function createDummyQueueMetaRegistry() + { + $registry = new QueueMetaRegistry($this->createDummyConfig(), []); + $registry->add('default'); + $registry->add('aFooQueue'); + $registry->add('aBarQueue', 'aBarQueue'); + + return $registry; + } + + /** + * @return Config + */ + private function createDummyConfig() + { + return Config::create('aPrefix'); + } +} diff --git a/pkg/gps/Tests/Symfony/GpsTransportFactoryTest.php b/pkg/gps/Tests/Symfony/GpsTransportFactoryTest.php new file mode 100644 index 000000000..f9f7c7f89 --- /dev/null +++ b/pkg/gps/Tests/Symfony/GpsTransportFactoryTest.php @@ -0,0 +1,157 @@ +assertClassImplements(TransportFactoryInterface::class, GpsTransportFactory::class); + } + + public function testCouldBeConstructedWithDefaultName() + { + $transport = new GpsTransportFactory(); + + $this->assertEquals('gps', $transport->getName()); + } + + public function testCouldBeConstructedWithCustomName() + { + $transport = new GpsTransportFactory('theCustomName'); + + $this->assertEquals('theCustomName', $transport->getName()); + } + + public function testShouldAllowAddConfiguration() + { + $transport = new GpsTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), []); + + $this->assertEquals([ + 'retries' => 3, + 'scopes' => [], + 'lazy' => true, + ], $config); + } + + public function testShouldAllowAddConfigurationAsString() + { + $transport = new GpsTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), ['gpsDSN']); + + $this->assertEquals([ + 'dsn' => 'gpsDSN', + 'lazy' => true, + 'retries' => 3, + 'scopes' => [], + ], $config); + } + + public function testShouldCreateConnectionFactory() + { + $container = new ContainerBuilder(); + + $transport = new GpsTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + 'projectId' => null, + 'lazy' => false, + 'retries' => 3, + 'scopes' => [], + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(GpsConnectionFactory::class, $factory->getClass()); + $this->assertSame([[ + 'lazy' => false, + 'retries' => 3, + ]], $factory->getArguments()); + } + + public function testShouldCreateConnectionFactoryFromDsnString() + { + $container = new ContainerBuilder(); + + $transport = new GpsTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + 'dsn' => 'theConnectionDSN', + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(GpsConnectionFactory::class, $factory->getClass()); + $this->assertSame(['theConnectionDSN'], $factory->getArguments()); + } + + public function testShouldCreateContext() + { + $container = new ContainerBuilder(); + + $transport = new GpsTransportFactory(); + + $serviceId = $transport->createContext($container, [ + 'projectId' => null, + 'lazy' => false, + 'retries' => 3, + 'scopes' => [], + ]); + + $this->assertEquals('enqueue.transport.gps.context', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $context = $container->getDefinition('enqueue.transport.gps.context'); + $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.gps.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('createContext', $context->getFactory()[1]); + } + + public function testShouldCreateDriver() + { + $container = new ContainerBuilder(); + + $transport = new GpsTransportFactory(); + + $serviceId = $transport->createDriver($container, []); + + $this->assertEquals('enqueue.client.gps.driver', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $driver = $container->getDefinition($serviceId); + $this->assertSame(GpsDriver::class, $driver->getClass()); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(0)); + $this->assertEquals('enqueue.transport.gps.context', (string) $driver->getArgument(0)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(1)); + $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(2)); + $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2)); + } +} From 375231c8d845a91a4b0aca91b22a0505ca93becd Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 17 Aug 2017 10:55:13 +0300 Subject: [PATCH 09/11] gps --- pkg/gps/GpsProducer.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/gps/GpsProducer.php b/pkg/gps/GpsProducer.php index 3ac216aad..e19f227a8 100644 --- a/pkg/gps/GpsProducer.php +++ b/pkg/gps/GpsProducer.php @@ -5,6 +5,7 @@ use Google\Cloud\PubSub\Topic; use Interop\Queue\DeliveryDelayNotSupportedException; use Interop\Queue\InvalidDestinationException; +use Interop\Queue\InvalidMessageException; use Interop\Queue\PriorityNotSupportedException; use Interop\Queue\PsrDestination; use Interop\Queue\PsrMessage; @@ -32,6 +33,7 @@ public function __construct(GpsContext $context) public function send(PsrDestination $destination, PsrMessage $message) { InvalidDestinationException::assertDestinationInstanceOf($destination, GpsTopic::class); + InvalidMessageException::assertMessageInstanceOf($message, GpsMessage::class); /** @var Topic $topic */ $topic = $this->context->getClient()->topic($destination->getTopicName()); From bc700f10494dedf15aa168c6d5f46a9373d69351 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 21 Aug 2017 12:28:29 +0300 Subject: [PATCH 10/11] gps --- pkg/gps/Symfony/GpsTransportFactory.php | 2 +- pkg/gps/composer.json | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/pkg/gps/Symfony/GpsTransportFactory.php b/pkg/gps/Symfony/GpsTransportFactory.php index bec183c25..1ad137a12 100644 --- a/pkg/gps/Symfony/GpsTransportFactory.php +++ b/pkg/gps/Symfony/GpsTransportFactory.php @@ -41,7 +41,7 @@ public function addConfiguration(ArrayNodeDefinition $builder) ->end() ->children() ->scalarNode('dsn') - ->info('The connection to AMQP broker set as a string. Other parameters are ignored if set') + ->info('The connection to Google Pub/Sub broker set as a string. Other parameters are ignored if set') ->end() ->scalarNode('projectId') ->info('The project ID from the Google Developer\'s Console.') diff --git a/pkg/gps/composer.json b/pkg/gps/composer.json index 3ec4e3d8c..ae3d50aa0 100644 --- a/pkg/gps/composer.json +++ b/pkg/gps/composer.json @@ -4,12 +4,6 @@ "description": "Message Google Cloud Pub/Sub Transport", "keywords": ["messaging", "queue", "google", "pubsub"], "license": "MIT", - "repositories": [ - { - "type": "vcs", - "url": "git@github.com:php-enqueue/test.git" - } - ], "require": { "php": ">=5.6", "queue-interop/queue-interop": "^0.6@dev", From 7f9b8672409a87ca9fa58908ff090e1675ec9aae Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 28 Aug 2017 13:11:18 +0300 Subject: [PATCH 11/11] [gps] add docs. --- README.md | 1 + docs/index.md | 1 + docs/transport/gps.md | 74 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+) create mode 100644 docs/transport/gps.md diff --git a/README.md b/README.md index 0a27fc228..b28a7e12a 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Features: * [Beanstalk](docs/transport/pheanstalk.md) * [STOMP](docs/transport/stomp.md) * [Amazon SQS](docs/transport/sqs.md) + * [Google PubSub](docs/transport/gps.md) * [Kafka](docs/transport/kafka.md) * [Redis](docs/transport/redis.md) * [Gearman](docs/transport/gearman.md) diff --git a/docs/index.md b/docs/index.md index 662007c51..c72d2ec73 100644 --- a/docs/index.md +++ b/docs/index.md @@ -4,6 +4,7 @@ * [Transports](#transports) - Amqp based on [the ext](transport/amqp.md), [bunny](transport/amqp_bunny.md), [the lib](transport/amqp_lib.md) - [Amazon SQS](transport/sqs.md) + - [Google PubSub](transport/gps.md) - [Beanstalk (Pheanstalk)](transport/pheanstalk.md) - [Gearman](transport/gearman.md) - [Kafka](transport/kafka.md) diff --git a/docs/transport/gps.md b/docs/transport/gps.md new file mode 100644 index 000000000..f124feb9b --- /dev/null +++ b/docs/transport/gps.md @@ -0,0 +1,74 @@ +# Google Pub Sub transport + +A transport for [Google Pub Sub](https://cloud.google.com/pubsub/docs/) cloud MQ. +It uses internally official google sdk library [google/cloud-pubsub](https://packagist.org/packages/google/cloud-pubsub) + +* [Installation](#installation) +* [Create context](#create-context) +* [Send message to topic](#send-message-to-topic) +* [Consume message](#consume-message) + +## Installation + +```bash +$ composer require enqueue/gps +``` + +## Create context + +To enable the Google Cloud Pub/Sub Emulator, set the `PUBSUB_EMULATOR_HOST` environment variable. +There is a handy docker container [google/cloud-sdk](https://hub.docker.com/r/google/cloud-sdk/). + +```php +createContext(); +``` + +## Send message to topic + +Before you can send message you have to declare a topic. +The operation creates a topic on a broker side. +Google allows messages to be sent only to topic. + +```php +createTopic('foo'); +$message = $psrContext->createMessage('Hello world!'); + +$psrContext->declareTopic($fooTopic); + +$psrContext->createProducer()->send($fooTopic, $message); +``` + +## Consume message: + +Before you can consume message you have to subscribe a queue to the topic. +Google does not allow consuming message from the topic directly. + +```php +createTopic('foo'); +$fooQueue = $psrContext->createQueue('foo'); + +$psrContext->subscribe($fooTopic, $fooQueue); + +$consumer = $psrContext->createConsumer($fooQueue); +$message = $consumer->receive(); + +// process a message + +$consumer->acknowledge($message); +// $consumer->reject($message); +``` + +[back to index](../index.md) \ No newline at end of file