From f99c2c1c1d763e00272f1d40cf11d7637a864065 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 22 Oct 2018 12:57:39 +0300 Subject: [PATCH 01/13] wamp --- composer.json | 5 +- pkg/wamp/.gitignore | 6 + pkg/wamp/.travis.yml | 21 +++ pkg/wamp/LICENSE | 20 +++ pkg/wamp/README.md | 27 ++++ pkg/wamp/WampConnectionFactory.php | 112 +++++++++++++++ pkg/wamp/WampConsumer.php | 127 +++++++++++++++++ pkg/wamp/WampContext.php | 118 ++++++++++++++++ pkg/wamp/WampDestination.php | 31 +++++ pkg/wamp/WampMessage.php | 44 ++++++ pkg/wamp/WampProducer.php | 189 ++++++++++++++++++++++++++ pkg/wamp/WampSubscriptionConsumer.php | 163 ++++++++++++++++++++++ pkg/wamp/composer.json | 39 ++++++ pkg/wamp/phpunit.xml.dist | 30 ++++ 14 files changed, 931 insertions(+), 1 deletion(-) create mode 100644 pkg/wamp/.gitignore create mode 100644 pkg/wamp/.travis.yml create mode 100644 pkg/wamp/LICENSE create mode 100644 pkg/wamp/README.md create mode 100644 pkg/wamp/WampConnectionFactory.php create mode 100644 pkg/wamp/WampConsumer.php create mode 100644 pkg/wamp/WampContext.php create mode 100644 pkg/wamp/WampDestination.php create mode 100644 pkg/wamp/WampMessage.php create mode 100644 pkg/wamp/WampProducer.php create mode 100644 pkg/wamp/WampSubscriptionConsumer.php create mode 100644 pkg/wamp/composer.json create mode 100644 pkg/wamp/phpunit.xml.dist diff --git a/composer.json b/composer.json index 0099b6cac..51ddb18ea 100644 --- a/composer.json +++ b/composer.json @@ -30,7 +30,9 @@ "php-http/guzzle6-adapter": "^1.1", "php-http/client-common": "^1.7@dev", "richardfullmer/rabbitmq-management-api": "^2.0", - "predis/predis": "^1.1" + "predis/predis": "^1.1", + "thruway/pawl-transport": "^0.5.0", + "voryx/thruway": "^0.5.3" }, "require-dev": { "phpunit/phpunit": "^5.5", @@ -75,6 +77,7 @@ "Enqueue\\Stomp\\": "pkg/stomp/", "Enqueue\\Test\\": "pkg/test/", "Enqueue\\Dsn\\": "pkg/dsn/", + "Enqueue\\Wamp\\": "pkg/wamp/", "Enqueue\\": "pkg/enqueue/" }, "exclude-from-classmap": [ diff --git a/pkg/wamp/.gitignore b/pkg/wamp/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/wamp/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/wamp/.travis.yml b/pkg/wamp/.travis.yml new file mode 100644 index 000000000..b7ba11943 --- /dev/null +++ b/pkg/wamp/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +git: + depth: 10 + +language: php + +php: + - '7.1' + - '7.2' + +cache: + directories: + - $HOME/.composer/cache + +install: + - composer self-update + - composer install + +script: + - vendor/bin/phpunit --exclude-group=functional diff --git a/pkg/wamp/LICENSE b/pkg/wamp/LICENSE new file mode 100644 index 000000000..d9736f8bf --- /dev/null +++ b/pkg/wamp/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) +Copyright (c) 2017 Kotliar Maksym + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/pkg/wamp/README.md b/pkg/wamp/README.md new file mode 100644 index 000000000..094a7ca94 --- /dev/null +++ b/pkg/wamp/README.md @@ -0,0 +1,27 @@ +# Web Application Messaging Protocol Transport + +[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) +[![Build Status](https://travis-ci.org/php-enqueue/wamp.png?branch=master)](https://travis-ci.org/php-enqueue/wamp) +[![Total Downloads](https://poser.pugx.org/enqueue/wamp/d/total.png)](https://packagist.org/packages/enqueue/wamp) +[![Latest Stable Version](https://poser.pugx.org/enqueue/wamp/version.png)](https://packagist.org/packages/enqueue/wamp) + +This is an implementation of [queue interop](https://github.com/queue-interop/queue-interop). It uses [Thruway](https://github.com/voryx/Thruway) internally. + +## Resources + +* [Site](https://enqueue.forma-pro.com/) +* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) +* [Questions](https://gitter.im/php-enqueue/Lobby) +* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) + +## Developed by Forma-Pro + +Forma-Pro is a full stack development company which interests also spread to open source development. +Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. +Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability. + +If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com + +## License + +It is released under the [MIT License](LICENSE). \ No newline at end of file diff --git a/pkg/wamp/WampConnectionFactory.php b/pkg/wamp/WampConnectionFactory.php new file mode 100644 index 000000000..e810ab052 --- /dev/null +++ b/pkg/wamp/WampConnectionFactory.php @@ -0,0 +1,112 @@ + true, + * 'dsn' => 'wamp://127.0.0.1:9090', + * 'host' => '127.0.0.1', + * 'port' => '9090', + * 'max_retries' => 15, + * 'initial_retry_delay' => 1.5, + * 'max_retry_delay' => 300, + * 'retry_delay_growth' => 1.5, + * ] + * + * or + * + * wamp://127.0.0.1:9090?max_retries=10 + * + * @param array|string|null $config + */ + public function __construct($config = 'wamp:') + { + if (empty($config)) { + $config = $this->parseDsn('wamp:'); + } elseif (is_string($config)) { + $config = $this->parseDsn($config); + } elseif (is_array($config)) { + $config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']); + } else { + throw new \LogicException('The config must be either an array of options, a DSN string or null'); + } + + $config = array_replace([ + 'lazy' => true, + 'host' => '127.0.0.1', + 'port' => '9090', + 'max_retries' => 15, + 'initial_retry_delay' => 1.5, + 'max_retry_delay' => 300, + 'retry_delay_growth' => 1.5, + ], $config); + + $this->config = $config; + } + + public function createContext(): Context + { + if ($this->config['lazy']) { + return new WampContext(function () { + return $this->establishConnection(); + }); + } + + return new WampContext($this->establishConnection()); + } + + private function establishConnection(): Client + { + $uri = sprintf('ws://%s:%s', $this->config['host'], $this->config['port']); + + $client = new Client('realm1'); + $client->addTransportProvider(new PawlTransportProvider($uri)); + $client->setReconnectOptions([ + 'max_retries' => $this->config['max_retries'], + 'initial_retry_delay' => $this->config['initial_retry_delay'], + 'max_retry_delay' => $this->config['max_retry_delay'], + 'retry_delay_growth' => $this->config['retry_delay_growth'], + ]); + + return $client; + } + + private function parseDsn(string $dsn): array + { + $dsn = new Dsn($dsn); + + if ('wamp' !== $dsn->getSchemeProtocol()) { + throw new \LogicException(sprintf( + 'The given scheme protocol "%s" is not supported. It must be "wamp"', + $dsn->getSchemeProtocol() + )); + } + + return array_filter(array_replace($dsn->getQuery(), [ + 'host' => $dsn->getHost(), + 'port' => $dsn->getPort(), + 'max_retries' => $dsn->getInt('max_retries'), + 'initial_retry_delay' => $dsn->getFloat('initial_retry_delay'), + 'max_retry_delay' => $dsn->getInt('max_retry_delay'), + 'retry_delay_growth' => $dsn->getFloat('retry_delay_growth'), + ]), function ($value) { return null !== $value; }); + } +} diff --git a/pkg/wamp/WampConsumer.php b/pkg/wamp/WampConsumer.php new file mode 100644 index 000000000..efa24cd95 --- /dev/null +++ b/pkg/wamp/WampConsumer.php @@ -0,0 +1,127 @@ +context = $context; + $this->queue = $destination; + } + + public function getQueue(): Queue + { + return $this->queue; + } + + public function receive(int $timeout = 0): ?Message + { + $init = false; + $this->timer = null; + $this->message = null; + + if (null === $this->client) { + $init = true; + + $this->client = $this->context->getClient(); + $this->client->setAttemptRetry(true); + $this->client->on('open', function (ClientSession $session) { + + $session->subscribe($this->queue->getQueueName(), function ($args) { + $this->message = WampMessage::jsonUnserialize($args[0]); + + $this->client->emit('do-stop'); + }); + }); + + $this->client->on('do-stop', function () { + if ($this->timer) { + $this->client->getLoop()->cancelTimer($this->timer); + } + + $this->client->getLoop()->stop(); + }); + } + + if ($timeout > 0) { + $this->timer = $this->client->getLoop()->addTimer($timeout / 1000, function () { + $this->client->emit('do-stop'); + }); + } + + if ($init) { + $this->client->start(false); + } + + $this->client->getLoop()->run(); + + return $this->message ?: null; + } + + public function receiveNoWait(): ?Message + { + return $this->receive(100); + } + + /** + * {@inheritdoc} + * + * @param WampMessage $message + */ + public function acknowledge(Message $message): void + { + // do nothing. redis transport always works in auto ack mode + } + + /** + * {@inheritdoc} + * + * @param WampMessage $message + */ + public function reject(Message $message, bool $requeue = false): void + { + InvalidMessageException::assertMessageInstanceOf($message, WampMessage::class); + + // do nothing on reject. redis transport always works in auto ack mode + + if ($requeue) { + $this->context->createProducer()->send($this->queue, $message); + } + } +} diff --git a/pkg/wamp/WampContext.php b/pkg/wamp/WampContext.php new file mode 100644 index 000000000..925a7a3c4 --- /dev/null +++ b/pkg/wamp/WampContext.php @@ -0,0 +1,118 @@ +client = $client; + } elseif (is_callable($client)) { + $this->clientFactory = $client; + } else { + throw new \InvalidArgumentException(sprintf( + 'The $client argument must be either %s or callable that returns %s once called.', + Client::class, + Client::class + )); + } + } + + public function createMessage(string $body = '', array $properties = [], array $headers = []): Message + { + return new WampMessage($body, $properties, $headers); + } + + public function createTopic(string $topicName): Topic + { + return new WampDestination($topicName); + } + + public function createQueue(string $queueName): Queue + { + return new WampDestination($queueName); + } + + public function createTemporaryQueue(): Queue + { + throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt(); + } + + public function createProducer(): Producer + { + return new WampProducer($this); + } + + public function createConsumer(Destination $destination): Consumer + { + InvalidDestinationException::assertDestinationInstanceOf($destination, WampDestination::class); + + return new WampConsumer($this, $destination); + } + + public function createSubscriptionConsumer(): SubscriptionConsumer + { + return new WampSubscriptionConsumer($this); + } + + public function purgeQueue(Queue $queue): void + { + } + + public function close(): void + { + if (null === $this->client) { + return; + } + + if (null === $this->client->getSession()) { + return; + } + + $this->client->setAttemptRetry(false); + $this->client->getSession()->close(); + } + + public function getClient(): Client + { + if (false == $this->client) { + $client = call_user_func($this->clientFactory); + if (false == $client instanceof Client) { + throw new \LogicException(sprintf( + 'The factory must return instance of "%s". But it returns %s', + Client::class, + is_object($client) ? get_class($client) : gettype($client) + )); + } + + $this->client = $client; + } + + return $this->client; + } +} diff --git a/pkg/wamp/WampDestination.php b/pkg/wamp/WampDestination.php new file mode 100644 index 000000000..a99bc1fa0 --- /dev/null +++ b/pkg/wamp/WampDestination.php @@ -0,0 +1,31 @@ +name = $name; + } + + public function getQueueName(): string + { + return $this->name; + } + + public function getTopicName(): string + { + return $this->name; + } +} diff --git a/pkg/wamp/WampMessage.php b/pkg/wamp/WampMessage.php new file mode 100644 index 000000000..937e8c879 --- /dev/null +++ b/pkg/wamp/WampMessage.php @@ -0,0 +1,44 @@ +body = $body; + $this->properties = $properties; + $this->headers = $headers; + $this->redelivered = false; + } + + public function jsonSerialize(): array + { + return [ + 'body' => $this->getBody(), + 'properties' => $this->getProperties(), + 'headers' => $this->getHeaders(), + ]; + } + + public static function jsonUnserialize(string $json): self + { + $data = json_decode($json, true); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return new self($data['body'], $data['properties'], $data['headers']); + } +} diff --git a/pkg/wamp/WampProducer.php b/pkg/wamp/WampProducer.php new file mode 100644 index 000000000..617fd45b9 --- /dev/null +++ b/pkg/wamp/WampProducer.php @@ -0,0 +1,189 @@ +context = $context; + } + + /** + * {@inheritdoc} + * + * @param WampDestination $destination + * @param WampMessage $message + */ + public function send(Destination $destination, Message $message): void + { + InvalidDestinationException::assertDestinationInstanceOf($destination, WampDestination::class); + InvalidMessageException::assertMessageInstanceOf($message, WampMessage::class); + + $init = false; + $this->message = $message; + $this->destination = $destination; + + if (null === $this->client) { + $init = true; + + $this->client = $this->context->getClient(); + $this->client->setAttemptRetry(true); + $this->client->on('open', function (ClientSession $session) { + $this->session = $session; + + $this->doSendMessageIfPossible(); + }); + + $this->client->on('close', function () { + if ($this->session === $this->client->getSession()) { + $this->session = null; + } + }); + + $this->client->on('error', function () { + if ($this->session === $this->client->getSession()) { + $this->session = null; + } + }); + + $this->client->on('do-send', function (WampDestination $destination, WampMessage $message) { + + $onFinish = function () { + $this->client->emit('do-stop'); + }; + + $this->session->publish($destination->getTopicName(), [json_encode($message->jsonSerialize())], [], ['acknowledge' => true]) + ->then($onFinish, $onFinish); + }); + + $this->client->on('do-stop', function () { + $this->client->getLoop()->stop(); + }); + } + + $this->client->getLoop()->futureTick(function () { + $this->doSendMessageIfPossible(); + }); + + if ($init) { + $this->client->start(false); + } + + $this->client->getLoop()->run(); + } + + private function doSendMessageIfPossible() + { + if (null === $this->session) { + return; + } + + if (null === $this->message) { + return; + } + + $message = $this->message; + $destination = $this->destination; + + $this->message = null; + $this->destination = null; + + $this->client->emit('do-send', [$destination, $message]); + } + + /** + * {@inheritdoc} + * + * @return WampProducer + */ + public function setDeliveryDelay(int $deliveryDelay = null): Producer + { + if (null === $deliveryDelay) { + return $this; + } + + throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + } + + public function getDeliveryDelay(): ?int + { + return null; + } + + /** + * {@inheritdoc} + * + * @return WampProducer + */ + public function setPriority(int $priority = null): Producer + { + if (null === $priority) { + return $this; + } + + throw PriorityNotSupportedException::providerDoestNotSupportIt(); + } + + public function getPriority(): ?int + { + return null; + } + + /** + * {@inheritdoc} + * + * @return WampProducer + */ + public function setTimeToLive(int $timeToLive = null): Producer + { + if (null === $timeToLive) { + return $this; + } + + throw TimeToLiveNotSupportedException::providerDoestNotSupportIt(); + } + + public function getTimeToLive(): ?int + { + return null; + } +} diff --git a/pkg/wamp/WampSubscriptionConsumer.php b/pkg/wamp/WampSubscriptionConsumer.php new file mode 100644 index 000000000..96cd2f593 --- /dev/null +++ b/pkg/wamp/WampSubscriptionConsumer.php @@ -0,0 +1,163 @@ +context = $context; + $this->subscribers = []; + } + + public function consume(int $timeout = 0): void + { + if (empty($this->subscribers)) { + throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); + } + + $init = false; + + if (null === $this->client) { + $init = true; + + $this->client = $this->context->getClient(); + $this->client->setAttemptRetry(true); + $this->client->on('open', function (ClientSession $session) { + + foreach ($this->subscribers as $queue => $subscriber) { + $session->subscribe($queue, function ($args) use ($subscriber, $session) { + $message = WampMessage::jsonUnserialize($args[0]); + + /** + * @var WampConsumer $consumer + * @var callable $callback + */ + list($consumer, $callback) = $subscriber; + + if (false === call_user_func($callback, $message, $consumer)) { + $this->client->emit('do-stop'); + } + }); + } + }); + + $this->client->on('do-stop', function () { + if ($this->timer) { + $this->client->getLoop()->cancelTimer($this->timer); + } + + $this->client->getLoop()->stop(); + }); + } + + if ($timeout > 0) { + $this->timer = $this->client->getLoop()->addTimer($timeout / 1000, function () { + $this->client->emit('do-stop'); + }); + } + + if ($init) { + $this->client->start(false); + } + + $this->client->getLoop()->run(); + } + + /** + * {@inheritdoc} + * + * @param WampConsumer $consumer + */ + public function subscribe(Consumer $consumer, callable $callback): void + { + if (false == $consumer instanceof WampConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', WampConsumer::class, get_class($consumer))); + } + + if ($this->client) { + throw new \LogicException('Could not subscribe after consume was called'); + } + + $queueName = $consumer->getQueue()->getQueueName(); + if (array_key_exists($queueName, $this->subscribers)) { + if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) { + return; + } + + throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName)); + } + + $this->subscribers[$queueName] = [$consumer, $callback]; + } + + /** + * {@inheritdoc} + * + * @param WampConsumer $consumer + */ + public function unsubscribe(Consumer $consumer): void + { + if (false == $consumer instanceof WampConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', WampConsumer::class, get_class($consumer))); + } + + if ($this->client) { + throw new \LogicException('Could not unsubscribe after consume was called'); + } + + $queueName = $consumer->getQueue()->getQueueName(); + + if (false == array_key_exists($queueName, $this->subscribers)) { + return; + } + + if ($this->subscribers[$queueName][0] !== $consumer) { + return; + } + + unset($this->subscribers[$queueName]); + } + + public function unsubscribeAll(): void + { + if ($this->client) { + throw new \LogicException('Could not unsubscribe after consume was called'); + } + + $this->subscribers = []; + } +} diff --git a/pkg/wamp/composer.json b/pkg/wamp/composer.json new file mode 100644 index 000000000..77ef84e5b --- /dev/null +++ b/pkg/wamp/composer.json @@ -0,0 +1,39 @@ +{ + "name": "enqueue/wamp", + "type": "library", + "description": "The Web Application Messaging Protocol Transport", + "keywords": ["messaging", "queue", "wamp", "thruway"], + "homepage": "https://enqueue.forma-pro.com/", + "license": "MIT", + "require": { + "php": "^7.1.3", + "queue-interop/queue-interop": "0.7.x-dev", + "thruway/pawl-transport": "^0.5.0", + "voryx/thruway": "^0.5.3" + }, + "require-dev": { + "phpunit/phpunit": "~5.4.0", + "enqueue/test": "0.9.x-dev", + "enqueue/null": "0.9.x-dev", + "queue-interop/queue-spec": "0.6.x-dev" + }, + "support": { + "email": "opensource@forma-pro.com", + "issues": "https://github.com/php-enqueue/enqueue-dev/issues", + "forum": "https://gitter.im/php-enqueue/Lobby", + "source": "https://github.com/php-enqueue/enqueue-dev", + "docs": "https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md" + }, + "autoload": { + "psr-4": { "Enqueue\\Wamp\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.9.x-dev" + } + } +} diff --git a/pkg/wamp/phpunit.xml.dist b/pkg/wamp/phpunit.xml.dist new file mode 100644 index 000000000..717a3c6db --- /dev/null +++ b/pkg/wamp/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Tests + + + + From 5e2d4ef30bda9176d07856ee319dd9806d208209 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 22 Oct 2018 15:22:55 +0300 Subject: [PATCH 02/13] wamp --- pkg/test/WampExtension.php | 18 +++++ .../Tests/Functional/WampConsumerTest.php | 65 +++++++++++++++++++ .../WampSubscriptionConsumerTest.php | 51 +++++++++++++++ .../Tests/Spec/WampConnectionFactoryTest.php | 20 ++++++ pkg/wamp/Tests/Spec/WampContextTest.php | 23 +++++++ pkg/wamp/Tests/Spec/WampMessageTest.php | 20 ++++++ pkg/wamp/Tests/Spec/WampProducerTest.php | 23 +++++++ pkg/wamp/Tests/Spec/WampQueueTest.php | 20 ++++++ pkg/wamp/Tests/Spec/WampTopicTest.php | 20 ++++++ pkg/wamp/WampConsumer.php | 12 +++- pkg/wamp/WampContext.php | 45 ++++++------- pkg/wamp/WampProducer.php | 2 +- pkg/wamp/WampSubscriptionConsumer.php | 12 +++- 13 files changed, 304 insertions(+), 27 deletions(-) create mode 100644 pkg/test/WampExtension.php create mode 100644 pkg/wamp/Tests/Functional/WampConsumerTest.php create mode 100644 pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php create mode 100644 pkg/wamp/Tests/Spec/WampConnectionFactoryTest.php create mode 100644 pkg/wamp/Tests/Spec/WampContextTest.php create mode 100644 pkg/wamp/Tests/Spec/WampMessageTest.php create mode 100644 pkg/wamp/Tests/Spec/WampProducerTest.php create mode 100644 pkg/wamp/Tests/Spec/WampQueueTest.php create mode 100644 pkg/wamp/Tests/Spec/WampTopicTest.php diff --git a/pkg/test/WampExtension.php b/pkg/test/WampExtension.php new file mode 100644 index 000000000..eab5d42b7 --- /dev/null +++ b/pkg/test/WampExtension.php @@ -0,0 +1,18 @@ +createContext(); + } +} diff --git a/pkg/wamp/Tests/Functional/WampConsumerTest.php b/pkg/wamp/Tests/Functional/WampConsumerTest.php new file mode 100644 index 000000000..612577d4b --- /dev/null +++ b/pkg/wamp/Tests/Functional/WampConsumerTest.php @@ -0,0 +1,65 @@ +buildWampContext(); + $topic = $context->createTopic('topic'); + $consumer = $context->createConsumer($topic); + $producer = $context->createProducer(); + $message = $context->createMessage('the body'); + + // init client + $consumer->receive(1); + + $consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) { + $producer->send($topic, $message); + }); + + $receivedMessage = $consumer->receive(100); + + $this->assertInstanceOf(WampMessage::class, $receivedMessage); + $this->assertSame('the body', $receivedMessage->getBody()); + } + + public function testShouldSendAndReceiveNoWaitMessage() + { + $context = $this->buildWampContext(); + $topic = $context->createTopic('topic'); + $consumer = $context->createConsumer($topic); + $producer = $context->createProducer(); + $message = $context->createMessage('the body'); + + // init client + $consumer->receive(1); + + $consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) { + $producer->send($topic, $message); + }); + + $receivedMessage = $consumer->receiveNoWait(); + + $this->assertInstanceOf(WampMessage::class, $receivedMessage); + $this->assertSame('the body', $receivedMessage->getBody()); + } +} diff --git a/pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php b/pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php new file mode 100644 index 000000000..c4f0ee8a7 --- /dev/null +++ b/pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php @@ -0,0 +1,51 @@ +buildWampContext(); + $topic = $context->createTopic('topic'); + $consumer = $context->createSubscriptionConsumer(); + $producer = $context->createProducer(); + $message = $context->createMessage('the body'); + + $receivedMessage = null; + $consumer->subscribe($context->createConsumer($topic), function ($message) use (&$receivedMessage) { + $receivedMessage = $message; + + return false; + }); + + // init client + $consumer->consume(1); + + $consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) { + $producer->send($topic, $message); + }); + + $consumer->consume(100); + + $this->assertInstanceOf(WampMessage::class, $receivedMessage); + $this->assertSame('the body', $receivedMessage->getBody()); + } +} diff --git a/pkg/wamp/Tests/Spec/WampConnectionFactoryTest.php b/pkg/wamp/Tests/Spec/WampConnectionFactoryTest.php new file mode 100644 index 000000000..3c3ccf841 --- /dev/null +++ b/pkg/wamp/Tests/Spec/WampConnectionFactoryTest.php @@ -0,0 +1,20 @@ +buildWampContext(); + } +} diff --git a/pkg/wamp/Tests/Spec/WampMessageTest.php b/pkg/wamp/Tests/Spec/WampMessageTest.php new file mode 100644 index 000000000..5482fadde --- /dev/null +++ b/pkg/wamp/Tests/Spec/WampMessageTest.php @@ -0,0 +1,20 @@ +buildWampContext()->createProducer(); + } +} diff --git a/pkg/wamp/Tests/Spec/WampQueueTest.php b/pkg/wamp/Tests/Spec/WampQueueTest.php new file mode 100644 index 000000000..e8cb87267 --- /dev/null +++ b/pkg/wamp/Tests/Spec/WampQueueTest.php @@ -0,0 +1,20 @@ +queue; } + public function getClient(): ?Client + { + return $this->client; + } + public function receive(int $timeout = 0): ?Message { $init = false; @@ -59,7 +64,7 @@ public function receive(int $timeout = 0): ?Message if (null === $this->client) { $init = true; - $this->client = $this->context->getClient(); + $this->client = $this->context->getNewClient(); $this->client->setAttemptRetry(true); $this->client->on('open', function (ClientSession $session) { @@ -80,7 +85,10 @@ public function receive(int $timeout = 0): ?Message } if ($timeout > 0) { - $this->timer = $this->client->getLoop()->addTimer($timeout / 1000, function () { + $timeout = $timeout / 1000; + $timeout = $timeout >= 0.1 ? $timeout : 0.1; + + $this->timer = $this->client->getLoop()->addTimer($timeout, function () { $this->client->emit('do-stop'); }); } diff --git a/pkg/wamp/WampContext.php b/pkg/wamp/WampContext.php index 925a7a3c4..3d6c403f0 100644 --- a/pkg/wamp/WampContext.php +++ b/pkg/wamp/WampContext.php @@ -19,9 +19,9 @@ class WampContext implements Context { /** - * @var Client + * @var Client[] */ - private $client; + private $clients; /** * @var callable @@ -86,33 +86,34 @@ public function purgeQueue(Queue $queue): void public function close(): void { - if (null === $this->client) { - return; - } + foreach ($this->clients as $client) { + if (null === $client) { + return; + } - if (null === $this->client->getSession()) { - return; - } + if (null === $client->getSession()) { + return; + } - $this->client->setAttemptRetry(false); - $this->client->getSession()->close(); + $client->setAttemptRetry(false); + $client->getSession()->close(); + } } - public function getClient(): Client + public function getNewClient(): Client { - if (false == $this->client) { - $client = call_user_func($this->clientFactory); - if (false == $client instanceof Client) { - throw new \LogicException(sprintf( - 'The factory must return instance of "%s". But it returns %s', - Client::class, - is_object($client) ? get_class($client) : gettype($client) - )); - } + $client = call_user_func($this->clientFactory); - $this->client = $client; + if (false == $client instanceof Client) { + throw new \LogicException(sprintf( + 'The factory must return instance of "%s". But it returns %s', + Client::class, + is_object($client) ? get_class($client) : gettype($client) + )); } - return $this->client; + $this->clients[] = $client; + + return $client; } } diff --git a/pkg/wamp/WampProducer.php b/pkg/wamp/WampProducer.php index 617fd45b9..53ab00b63 100644 --- a/pkg/wamp/WampProducer.php +++ b/pkg/wamp/WampProducer.php @@ -65,7 +65,7 @@ public function send(Destination $destination, Message $message): void if (null === $this->client) { $init = true; - $this->client = $this->context->getClient(); + $this->client = $this->context->getNewClient(); $this->client->setAttemptRetry(true); $this->client->on('open', function (ClientSession $session) { $this->session = $session; diff --git a/pkg/wamp/WampSubscriptionConsumer.php b/pkg/wamp/WampSubscriptionConsumer.php index 96cd2f593..ddbd7e3a0 100644 --- a/pkg/wamp/WampSubscriptionConsumer.php +++ b/pkg/wamp/WampSubscriptionConsumer.php @@ -43,6 +43,11 @@ public function __construct(WampContext $context) $this->subscribers = []; } + public function getClient(): ?Client + { + return $this->client; + } + public function consume(int $timeout = 0): void { if (empty($this->subscribers)) { @@ -54,7 +59,7 @@ public function consume(int $timeout = 0): void if (null === $this->client) { $init = true; - $this->client = $this->context->getClient(); + $this->client = $this->context->getNewClient(); $this->client->setAttemptRetry(true); $this->client->on('open', function (ClientSession $session) { @@ -85,7 +90,10 @@ public function consume(int $timeout = 0): void } if ($timeout > 0) { - $this->timer = $this->client->getLoop()->addTimer($timeout / 1000, function () { + $timeout = $timeout / 1000; + $timeout = $timeout >= 0.1 ? $timeout : 0.1; + + $this->timer = $this->client->getLoop()->addTimer($timeout, function () { $this->client->emit('do-stop'); }); } From 99778d7c989f826420a8e11d646b53c8043731f9 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 22 Oct 2018 15:29:39 +0300 Subject: [PATCH 03/13] wamp --- pkg/wamp/WampConsumer.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/wamp/WampConsumer.php b/pkg/wamp/WampConsumer.php index 32df30c17..90fcaf846 100644 --- a/pkg/wamp/WampConsumer.php +++ b/pkg/wamp/WampConsumer.php @@ -114,7 +114,7 @@ public function receiveNoWait(): ?Message */ public function acknowledge(Message $message): void { - // do nothing. redis transport always works in auto ack mode + // do nothing. wamp transport always works in auto ack mode } /** @@ -126,7 +126,7 @@ public function reject(Message $message, bool $requeue = false): void { InvalidMessageException::assertMessageInstanceOf($message, WampMessage::class); - // do nothing on reject. redis transport always works in auto ack mode + // do nothing on reject. wamp transport always works in auto ack mode if ($requeue) { $this->context->createProducer()->send($this->queue, $message); From a512b5358c0958a6f1b7b300e0a248d5123f31fd Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 23 Oct 2018 09:51:53 +0300 Subject: [PATCH 04/13] wamp --- pkg/wamp/JsonSerializer.php | 41 ++++++++++++ pkg/wamp/Serializer.php | 12 ++++ pkg/wamp/SerializerAwareTrait.php | 29 +++++++++ pkg/wamp/Tests/Spec/JsonSerializerTest.php | 76 ++++++++++++++++++++++ pkg/wamp/WampConsumer.php | 6 +- pkg/wamp/WampContext.php | 6 ++ pkg/wamp/WampMessage.php | 25 +------ pkg/wamp/WampProducer.php | 4 +- pkg/wamp/WampSubscriptionConsumer.php | 9 +-- 9 files changed, 174 insertions(+), 34 deletions(-) create mode 100644 pkg/wamp/JsonSerializer.php create mode 100644 pkg/wamp/Serializer.php create mode 100644 pkg/wamp/SerializerAwareTrait.php create mode 100644 pkg/wamp/Tests/Spec/JsonSerializerTest.php diff --git a/pkg/wamp/JsonSerializer.php b/pkg/wamp/JsonSerializer.php new file mode 100644 index 000000000..b6027ba81 --- /dev/null +++ b/pkg/wamp/JsonSerializer.php @@ -0,0 +1,41 @@ + $message->getBody(), + 'properties' => $message->getProperties(), + 'headers' => $message->getHeaders(), + ]); + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return $json; + } + + public function toMessage(string $string): WampMessage + { + $data = json_decode($string, true); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return new WampMessage($data['body'], $data['properties'], $data['headers']); + } +} diff --git a/pkg/wamp/Serializer.php b/pkg/wamp/Serializer.php new file mode 100644 index 000000000..414fcf414 --- /dev/null +++ b/pkg/wamp/Serializer.php @@ -0,0 +1,12 @@ +serializer = $serializer; + } + + /** + * @return Serializer + */ + public function getSerializer() + { + return $this->serializer; + } +} diff --git a/pkg/wamp/Tests/Spec/JsonSerializerTest.php b/pkg/wamp/Tests/Spec/JsonSerializerTest.php new file mode 100644 index 000000000..a8d7c59b5 --- /dev/null +++ b/pkg/wamp/Tests/Spec/JsonSerializerTest.php @@ -0,0 +1,76 @@ +assertClassImplements(Serializer::class, JsonSerializer::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new JsonSerializer(); + } + + public function testShouldConvertMessageToJsonString() + { + $serializer = new JsonSerializer(); + + $message = new WampMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); + + $json = $serializer->toString($message); + + $this->assertSame('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}', $json); + } + + public function testThrowIfFailedToEncodeMessageToJson() + { + $serializer = new JsonSerializer(); + + $resource = fopen(__FILE__, 'r'); + + //guard + $this->assertInternalType('resource', $resource); + + $message = new WampMessage('theBody', ['aProp' => $resource]); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given.'); + $serializer->toString($message); + } + + public function testShouldConvertJsonStringToMessage() + { + $serializer = new JsonSerializer(); + + $message = $serializer->toMessage('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}'); + + $this->assertInstanceOf(WampMessage::class, $message); + + $this->assertSame('theBody', $message->getBody()); + $this->assertSame(['aProp' => 'aPropVal'], $message->getProperties()); + $this->assertSame(['aHeader' => 'aHeaderVal'], $message->getHeaders()); + } + + public function testThrowIfFailedToDecodeJsonToMessage() + { + $serializer = new JsonSerializer(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given.'); + $serializer->toMessage('{]'); + } +} diff --git a/pkg/wamp/WampConsumer.php b/pkg/wamp/WampConsumer.php index 90fcaf846..c4859aa7d 100644 --- a/pkg/wamp/WampConsumer.php +++ b/pkg/wamp/WampConsumer.php @@ -8,7 +8,7 @@ use Interop\Queue\Exception\InvalidMessageException; use Interop\Queue\Message; use Interop\Queue\Queue; -use React\EventLoop\Timer\Timer; +use React\EventLoop\TimerInterface; use Thruway\ClientSession; use Thruway\Peer\Client; @@ -35,7 +35,7 @@ class WampConsumer implements Consumer private $message; /** - * @var Timer + * @var TimerInterface */ private $timer; @@ -69,7 +69,7 @@ public function receive(int $timeout = 0): ?Message $this->client->on('open', function (ClientSession $session) { $session->subscribe($this->queue->getQueueName(), function ($args) { - $this->message = WampMessage::jsonUnserialize($args[0]); + $this->message = $this->context->getSerializer()->toMessage($args[0]); $this->client->emit('do-stop'); }); diff --git a/pkg/wamp/WampContext.php b/pkg/wamp/WampContext.php index 3d6c403f0..d68dc17b8 100644 --- a/pkg/wamp/WampContext.php +++ b/pkg/wamp/WampContext.php @@ -8,6 +8,7 @@ use Interop\Queue\Context; use Interop\Queue\Destination; use Interop\Queue\Exception\InvalidDestinationException; +use Interop\Queue\Exception\PurgeQueueNotSupportedException; use Interop\Queue\Exception\TemporaryQueueNotSupportedException; use Interop\Queue\Message; use Interop\Queue\Producer; @@ -18,6 +19,8 @@ class WampContext implements Context { + use SerializerAwareTrait; + /** * @var Client[] */ @@ -41,6 +44,8 @@ public function __construct($client) Client::class )); } + + $this->setSerializer(new JsonSerializer()); } public function createMessage(string $body = '', array $properties = [], array $headers = []): Message @@ -82,6 +87,7 @@ public function createSubscriptionConsumer(): SubscriptionConsumer public function purgeQueue(Queue $queue): void { + throw PurgeQueueNotSupportedException::providerDoestNotSupportIt(); } public function close(): void diff --git a/pkg/wamp/WampMessage.php b/pkg/wamp/WampMessage.php index 937e8c879..9ad41f5a4 100644 --- a/pkg/wamp/WampMessage.php +++ b/pkg/wamp/WampMessage.php @@ -7,7 +7,7 @@ use Interop\Queue\Impl\MessageTrait; use Interop\Queue\Message; -class WampMessage implements Message, \JsonSerializable +class WampMessage implements Message { use MessageTrait; @@ -18,27 +18,4 @@ public function __construct(string $body = '', array $properties = [], array $he $this->headers = $headers; $this->redelivered = false; } - - public function jsonSerialize(): array - { - return [ - 'body' => $this->getBody(), - 'properties' => $this->getProperties(), - 'headers' => $this->getHeaders(), - ]; - } - - public static function jsonUnserialize(string $json): self - { - $data = json_decode($json, true); - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'The malformed json given. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); - } - - return new self($data['body'], $data['properties'], $data['headers']); - } } diff --git a/pkg/wamp/WampProducer.php b/pkg/wamp/WampProducer.php index 53ab00b63..e5bf0879f 100644 --- a/pkg/wamp/WampProducer.php +++ b/pkg/wamp/WampProducer.php @@ -91,7 +91,9 @@ public function send(Destination $destination, Message $message): void $this->client->emit('do-stop'); }; - $this->session->publish($destination->getTopicName(), [json_encode($message->jsonSerialize())], [], ['acknowledge' => true]) + $payload = $this->context->getSerializer()->toString($message); + + $this->session->publish($destination->getTopicName(), [$payload], [], ['acknowledge' => true]) ->then($onFinish, $onFinish); }); diff --git a/pkg/wamp/WampSubscriptionConsumer.php b/pkg/wamp/WampSubscriptionConsumer.php index ddbd7e3a0..cae06d8c9 100644 --- a/pkg/wamp/WampSubscriptionConsumer.php +++ b/pkg/wamp/WampSubscriptionConsumer.php @@ -6,7 +6,7 @@ use Interop\Queue\Consumer; use Interop\Queue\SubscriptionConsumer; -use React\EventLoop\Timer\Timer; +use React\EventLoop\TimerInterface; use Thruway\ClientSession; use Thruway\Peer\Client; @@ -30,13 +30,10 @@ class WampSubscriptionConsumer implements SubscriptionConsumer private $client; /** - * @var Timer + * @var TimerInterface */ private $timer; - /** - * @param WampContext $context - */ public function __construct(WampContext $context) { $this->context = $context; @@ -65,7 +62,7 @@ public function consume(int $timeout = 0): void foreach ($this->subscribers as $queue => $subscriber) { $session->subscribe($queue, function ($args) use ($subscriber, $session) { - $message = WampMessage::jsonUnserialize($args[0]); + $message = $this->context->getSerializer()->toMessage($args[0]); /** * @var WampConsumer $consumer From 18273bf74a4241224effb6f8bd832d19cdd6cadb Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 23 Oct 2018 10:05:33 +0300 Subject: [PATCH 05/13] wamp --- pkg/wamp/LICENSE | 2 +- pkg/wamp/README.md | 2 +- pkg/wamp/WampConsumer.php | 7 ++++++- pkg/wamp/composer.json | 1 + 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/wamp/LICENSE b/pkg/wamp/LICENSE index d9736f8bf..7afbaa1ff 100644 --- a/pkg/wamp/LICENSE +++ b/pkg/wamp/LICENSE @@ -1,5 +1,5 @@ The MIT License (MIT) -Copyright (c) 2017 Kotliar Maksym +Copyright (c) 2018 Forma-Pro Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/pkg/wamp/README.md b/pkg/wamp/README.md index 094a7ca94..6838a366c 100644 --- a/pkg/wamp/README.md +++ b/pkg/wamp/README.md @@ -1,4 +1,4 @@ -# Web Application Messaging Protocol Transport +# Web Application Messaging Protocol (WAMP) Transport [![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) [![Build Status](https://travis-ci.org/php-enqueue/wamp.png?branch=master)](https://travis-ci.org/php-enqueue/wamp) diff --git a/pkg/wamp/WampConsumer.php b/pkg/wamp/WampConsumer.php index c4859aa7d..a56058c91 100644 --- a/pkg/wamp/WampConsumer.php +++ b/pkg/wamp/WampConsumer.php @@ -99,7 +99,12 @@ public function receive(int $timeout = 0): ?Message $this->client->getLoop()->run(); - return $this->message ?: null; + $message = $this->message; + + $this->timer = null; + $this->message = null; + + return $message; } public function receiveNoWait(): ?Message diff --git a/pkg/wamp/composer.json b/pkg/wamp/composer.json index 77ef84e5b..edb1bcc86 100644 --- a/pkg/wamp/composer.json +++ b/pkg/wamp/composer.json @@ -8,6 +8,7 @@ "require": { "php": "^7.1.3", "queue-interop/queue-interop": "0.7.x-dev", + "enqueue/dsn": "0.9.x-dev", "thruway/pawl-transport": "^0.5.0", "voryx/thruway": "^0.5.3" }, From 49538dedf7508c9b4b36eec2d7b0a1045beafd09 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 23 Oct 2018 10:37:11 +0300 Subject: [PATCH 06/13] wamp --- bin/subtree-split | 2 ++ docs/index.md | 1 + docs/transport/wamp.md | 73 +++++++++++++++++++++++++++++++++++++++ pkg/enqueue/Resources.php | 6 ++++ 4 files changed, 82 insertions(+) create mode 100644 docs/transport/wamp.md diff --git a/bin/subtree-split b/bin/subtree-split index 222574f13..f7d63eb34 100755 --- a/bin/subtree-split +++ b/bin/subtree-split @@ -66,6 +66,7 @@ remote async-event-dispatcher git@github.com:php-enqueue/async-event-dispatcher. remote async-command git@github.com:php-enqueue/async-command.git remote mongodb git@github.com:php-enqueue/mongodb.git remote dsn git@github.com:php-enqueue/dsn.git +remote wamp git@github.com:php-enqueue/wamp.git split 'pkg/enqueue' enqueue split 'pkg/simple-client' simple-client @@ -90,3 +91,4 @@ split 'pkg/async-event-dispatcher' async-event-dispatcher split 'pkg/async-command' async-command split 'pkg/mongodb' mongodb split 'pkg/dsn' dsn +split 'pkg/wamp' wamp diff --git a/docs/index.md b/docs/index.md index 32580b53a..fe57d473f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -10,6 +10,7 @@ - [Kafka](transport/kafka.md) - [Stomp](transport/stomp.md) - [Redis](transport/redis.md) + - [Wamp](transport/wamp.md) - [Doctrine DBAL](transport/dbal.md) - [Filesystem](transport/filesystem.md) - [Null](transport/null.md) diff --git a/docs/transport/wamp.md b/docs/transport/wamp.md new file mode 100644 index 000000000..b94add54a --- /dev/null +++ b/docs/transport/wamp.md @@ -0,0 +1,73 @@ +# Web Application Messaging Protocol (WAMP) Transport + +A transport for [Web Application Messaging Protocol](https://wamp-proto.org/). +WAMP is an open standard WebSocket subprotocol. +It uses internally Thruway PHP library [voryx/thruway](https://github.com/voryx/Thruway) + +* [Installation](#installation) +* [Start the WAMP router](#start-the-wamp-router) +* [Create context](#create-context) +* [Consume message](#consume-message) +* [Send message to topic](#send-message-to-topic) + +## Installation + +```bash +$ composer require enqueue/wamp +``` + +## Start the WAMP router + +```bash +$ php vendor/voryx/thruway/Examples/SimpleWsRouter.php +``` + +Thruway is now running on 127.0.0.1 port 9090 + + +## Create context + +```php +createContext(); + +// if you have enqueue/enqueue library installed you can use a factory to build context from DSN +$context = (new \Enqueue\ConnectionFactoryFactory())->create('wamp:')->createContext(); +``` + +## Consume message: + +Start message consumer before send message to the topic + +```php +createTopic('foo'); + +$consumer = $context->createConsumer($fooQueue); + +while (true) { + if ($message = $consumer->receive()) { + // process a message + } +} +``` + +## Send message to topic + +```php +createTopic('foo'); +$message = $context->createMessage('Hello world!'); + +$context->createProducer()->send($fooTopic, $message); +``` + +[back to index](../index.md) \ No newline at end of file diff --git a/pkg/enqueue/Resources.php b/pkg/enqueue/Resources.php index 21172c636..3232ba013 100644 --- a/pkg/enqueue/Resources.php +++ b/pkg/enqueue/Resources.php @@ -16,6 +16,7 @@ use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Sqs\SqsConnectionFactory; use Enqueue\Stomp\StompConnectionFactory; +use Enqueue\Wamp\WampConnectionFactory; use Interop\Queue\ConnectionFactory; final class Resources @@ -163,6 +164,11 @@ public static function getKnownConnections(): array 'supportedSchemeExtensions' => [], 'package' => 'enqueue/mongodb', ]; + $map[WampConnectionFactory::class] = [ + 'schemes' => ['wamp'], + 'supportedSchemeExtensions' => [], + 'package' => 'enqueue/wamp', + ]; self::$knownConnections = $map; } From 3d406ccc608d278e33b28d30f26c256fce9ee785 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 23 Oct 2018 13:33:41 +0300 Subject: [PATCH 07/13] wamp --- docs/transport/wamp.md | 30 +++++++++++++++++++++++++++++ phpunit.xml.dist | 4 ++++ pkg/enqueue/Resources.php | 2 +- pkg/enqueue/Tests/ResourcesTest.php | 19 ++++++++++++++++++ pkg/wamp/WampConnectionFactory.php | 2 +- 5 files changed, 55 insertions(+), 2 deletions(-) diff --git a/docs/transport/wamp.md b/docs/transport/wamp.md index b94add54a..dcb995d00 100644 --- a/docs/transport/wamp.md +++ b/docs/transport/wamp.md @@ -8,6 +8,7 @@ It uses internally Thruway PHP library [voryx/thruway](https://github.com/voryx/ * [Start the WAMP router](#start-the-wamp-router) * [Create context](#create-context) * [Consume message](#consume-message) +* [Subscription consumer](#subscription-consumer) * [Send message to topic](#send-message-to-topic) ## Installation @@ -58,6 +59,35 @@ while (true) { } ``` +## Subscription consumer + +```php +createConsumer($fooQueue); +$barConsumer = $context->createConsumer($barQueue); + +$subscriptionConsumer = $context->createSubscriptionConsumer(); +$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) { + // process message + + return true; +}); +$subscriptionConsumer->subscribe($barConsumer, function(Message $message, Consumer $consumer) { + // process message + + return true; +}); + +$subscriptionConsumer->consume(2000); // 2 sec +``` + ## Send message to topic ```php diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 10afc56e7..f1c8f205f 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -104,6 +104,10 @@ pkg/dsn/Tests + + + pkg/wamp/Tests + diff --git a/pkg/enqueue/Resources.php b/pkg/enqueue/Resources.php index 3232ba013..508dc2694 100644 --- a/pkg/enqueue/Resources.php +++ b/pkg/enqueue/Resources.php @@ -165,7 +165,7 @@ public static function getKnownConnections(): array 'package' => 'enqueue/mongodb', ]; $map[WampConnectionFactory::class] = [ - 'schemes' => ['wamp'], + 'schemes' => ['wamp', 'ws'], 'supportedSchemeExtensions' => [], 'package' => 'enqueue/wamp', ]; diff --git a/pkg/enqueue/Tests/ResourcesTest.php b/pkg/enqueue/Tests/ResourcesTest.php index 52bd13f9f..ed36236a7 100644 --- a/pkg/enqueue/Tests/ResourcesTest.php +++ b/pkg/enqueue/Tests/ResourcesTest.php @@ -4,6 +4,7 @@ use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Resources; +use Enqueue\Wamp\WampConnectionFactory; use Interop\Queue\ConnectionFactory; use PHPUnit\Framework\TestCase; @@ -127,4 +128,22 @@ public function testShouldAllowGetPreviouslyRegisteredConnection() $this->assertArrayHasKey('package', $connectionInfo); $this->assertSame('foo/bar', $connectionInfo['package']); } + + public function testShouldHaveRegisteredWampConfiguration() + { + $availableConnections = Resources::getKnownConnections(); + + $this->assertInternalType('array', $availableConnections); + $this->assertArrayHasKey(WampConnectionFactory::class, $availableConnections); + + $connectionInfo = $availableConnections[WampConnectionFactory::class]; + $this->assertArrayHasKey('schemes', $connectionInfo); + $this->assertSame(['wamp', 'ws'], $connectionInfo['schemes']); + + $this->assertArrayHasKey('supportedSchemeExtensions', $connectionInfo); + $this->assertSame([], $connectionInfo['supportedSchemeExtensions']); + + $this->assertArrayHasKey('package', $connectionInfo); + $this->assertSame('enqueue/wamp', $connectionInfo['package']); + } } diff --git a/pkg/wamp/WampConnectionFactory.php b/pkg/wamp/WampConnectionFactory.php index e810ab052..c56635170 100644 --- a/pkg/wamp/WampConnectionFactory.php +++ b/pkg/wamp/WampConnectionFactory.php @@ -93,7 +93,7 @@ private function parseDsn(string $dsn): array { $dsn = new Dsn($dsn); - if ('wamp' !== $dsn->getSchemeProtocol()) { + if (false === in_array($dsn->getSchemeProtocol(), ['wamp', 'ws'], true)) { throw new \LogicException(sprintf( 'The given scheme protocol "%s" is not supported. It must be "wamp"', $dsn->getSchemeProtocol() From 3364d611f24d364916ac602fc49a53bd39ad8aac Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 23 Oct 2018 14:10:29 +0300 Subject: [PATCH 08/13] wamp --- pkg/wamp/Tests/Spec/JsonSerializerTest.php | 2 +- pkg/wamp/WampContext.php | 18 ++---------------- pkg/wamp/WampSubscriptionConsumer.php | 2 +- 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/pkg/wamp/Tests/Spec/JsonSerializerTest.php b/pkg/wamp/Tests/Spec/JsonSerializerTest.php index a8d7c59b5..e83e599ee 100644 --- a/pkg/wamp/Tests/Spec/JsonSerializerTest.php +++ b/pkg/wamp/Tests/Spec/JsonSerializerTest.php @@ -1,6 +1,6 @@ client = $client; - } elseif (is_callable($client)) { - $this->clientFactory = $client; - } else { - throw new \InvalidArgumentException(sprintf( - 'The $client argument must be either %s or callable that returns %s once called.', - Client::class, - Client::class - )); - } + $this->clientFactory = $clientFactory; $this->setSerializer(new JsonSerializer()); } @@ -93,10 +83,6 @@ public function purgeQueue(Queue $queue): void public function close(): void { foreach ($this->clients as $client) { - if (null === $client) { - return; - } - if (null === $client->getSession()) { return; } diff --git a/pkg/wamp/WampSubscriptionConsumer.php b/pkg/wamp/WampSubscriptionConsumer.php index cae06d8c9..77e60d824 100644 --- a/pkg/wamp/WampSubscriptionConsumer.php +++ b/pkg/wamp/WampSubscriptionConsumer.php @@ -61,7 +61,7 @@ public function consume(int $timeout = 0): void $this->client->on('open', function (ClientSession $session) { foreach ($this->subscribers as $queue => $subscriber) { - $session->subscribe($queue, function ($args) use ($subscriber, $session) { + $session->subscribe($queue, function ($args) use ($subscriber) { $message = $this->context->getSerializer()->toMessage($args[0]); /** From ba418ada76333e6ae078a39dccf74b5b963919c6 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 23 Oct 2018 14:15:05 +0300 Subject: [PATCH 09/13] wamp --- .../Tests/Functional/WampConsumerTest.php | 2 +- .../WampSubscriptionConsumerTest.php | 2 +- pkg/wamp/Tests/Spec/JsonSerializerTest.php | 4 +- pkg/wamp/WampConsumer.php | 1 - pkg/wamp/WampProducer.php | 41 +++++++++---------- pkg/wamp/WampSubscriptionConsumer.php | 3 +- 6 files changed, 25 insertions(+), 28 deletions(-) diff --git a/pkg/wamp/Tests/Functional/WampConsumerTest.php b/pkg/wamp/Tests/Functional/WampConsumerTest.php index 612577d4b..e9d3fca42 100644 --- a/pkg/wamp/Tests/Functional/WampConsumerTest.php +++ b/pkg/wamp/Tests/Functional/WampConsumerTest.php @@ -35,7 +35,7 @@ public function testShouldSendAndReceiveMessage() $consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) { $producer->send($topic, $message); }); - + $receivedMessage = $consumer->receive(100); $this->assertInstanceOf(WampMessage::class, $receivedMessage); diff --git a/pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php b/pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php index c4f0ee8a7..c28b3e0a5 100644 --- a/pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php +++ b/pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php @@ -42,7 +42,7 @@ public function testShouldSendAndReceiveMessage() $consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) { $producer->send($topic, $message); }); - + $consumer->consume(100); $this->assertInstanceOf(WampMessage::class, $receivedMessage); diff --git a/pkg/wamp/Tests/Spec/JsonSerializerTest.php b/pkg/wamp/Tests/Spec/JsonSerializerTest.php index e83e599ee..4bc49c599 100644 --- a/pkg/wamp/Tests/Spec/JsonSerializerTest.php +++ b/pkg/wamp/Tests/Spec/JsonSerializerTest.php @@ -2,9 +2,9 @@ namespace Enqueue\Wamp\Tests\Spec; +use Enqueue\Test\ClassExtensionTrait; use Enqueue\Wamp\JsonSerializer; use Enqueue\Wamp\Serializer; -use Enqueue\Test\ClassExtensionTrait; use Enqueue\Wamp\WampMessage; use PHPUnit\Framework\TestCase; @@ -40,7 +40,7 @@ public function testThrowIfFailedToEncodeMessageToJson() { $serializer = new JsonSerializer(); - $resource = fopen(__FILE__, 'r'); + $resource = fopen(__FILE__, 'rb'); //guard $this->assertInternalType('resource', $resource); diff --git a/pkg/wamp/WampConsumer.php b/pkg/wamp/WampConsumer.php index a56058c91..8dc859739 100644 --- a/pkg/wamp/WampConsumer.php +++ b/pkg/wamp/WampConsumer.php @@ -67,7 +67,6 @@ public function receive(int $timeout = 0): ?Message $this->client = $this->context->getNewClient(); $this->client->setAttemptRetry(true); $this->client->on('open', function (ClientSession $session) { - $session->subscribe($this->queue->getQueueName(), function ($args) { $this->message = $this->context->getSerializer()->toMessage($args[0]); diff --git a/pkg/wamp/WampProducer.php b/pkg/wamp/WampProducer.php index e5bf0879f..3bffe21f0 100644 --- a/pkg/wamp/WampProducer.php +++ b/pkg/wamp/WampProducer.php @@ -51,7 +51,7 @@ public function __construct(WampContext $context) * {@inheritdoc} * * @param WampDestination $destination - * @param WampMessage $message + * @param WampMessage $message */ public function send(Destination $destination, Message $message): void { @@ -86,7 +86,6 @@ public function send(Destination $destination, Message $message): void }); $this->client->on('do-send', function (WampDestination $destination, WampMessage $message) { - $onFinish = function () { $this->client->emit('do-stop'); }; @@ -113,25 +112,6 @@ public function send(Destination $destination, Message $message): void $this->client->getLoop()->run(); } - private function doSendMessageIfPossible() - { - if (null === $this->session) { - return; - } - - if (null === $this->message) { - return; - } - - $message = $this->message; - $destination = $this->destination; - - $this->message = null; - $this->destination = null; - - $this->client->emit('do-send', [$destination, $message]); - } - /** * {@inheritdoc} * @@ -188,4 +168,23 @@ public function getTimeToLive(): ?int { return null; } + + private function doSendMessageIfPossible() + { + if (null === $this->session) { + return; + } + + if (null === $this->message) { + return; + } + + $message = $this->message; + $destination = $this->destination; + + $this->message = null; + $this->destination = null; + + $this->client->emit('do-send', [$destination, $message]); + } } diff --git a/pkg/wamp/WampSubscriptionConsumer.php b/pkg/wamp/WampSubscriptionConsumer.php index 77e60d824..79ab40351 100644 --- a/pkg/wamp/WampSubscriptionConsumer.php +++ b/pkg/wamp/WampSubscriptionConsumer.php @@ -59,14 +59,13 @@ public function consume(int $timeout = 0): void $this->client = $this->context->getNewClient(); $this->client->setAttemptRetry(true); $this->client->on('open', function (ClientSession $session) { - foreach ($this->subscribers as $queue => $subscriber) { $session->subscribe($queue, function ($args) use ($subscriber) { $message = $this->context->getSerializer()->toMessage($args[0]); /** * @var WampConsumer $consumer - * @var callable $callback + * @var callable $callback */ list($consumer, $callback) = $subscriber; From 24f8f90051844e6a02856f60fce37234fa69cab1 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 23 Oct 2018 14:25:26 +0300 Subject: [PATCH 10/13] wamp --- pkg/wamp/WampSubscriptionConsumer.php | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/wamp/WampSubscriptionConsumer.php b/pkg/wamp/WampSubscriptionConsumer.php index 79ab40351..6b96926e1 100644 --- a/pkg/wamp/WampSubscriptionConsumer.php +++ b/pkg/wamp/WampSubscriptionConsumer.php @@ -63,10 +63,8 @@ public function consume(int $timeout = 0): void $session->subscribe($queue, function ($args) use ($subscriber) { $message = $this->context->getSerializer()->toMessage($args[0]); - /** - * @var WampConsumer $consumer - * @var callable $callback - */ + /** @var WampConsumer $consumer */ + /** @var callable $callback */ list($consumer, $callback) = $subscriber; if (false === call_user_func($callback, $message, $consumer)) { From 715bb7f8d889746a9f9d177a52bdb6accd9e9a9e Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 23 Oct 2018 15:02:33 +0300 Subject: [PATCH 11/13] wamp --- docker-compose.yml | 13 +++++++++++++ docker/bin/test.sh | 1 + 2 files changed, 14 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 60f7082d9..a909f2746 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,7 @@ services: - google-pubsub - rabbitmqssl - mongo + - thruway - localstack volumes: - './:/mqdev' @@ -30,6 +31,7 @@ services: - PHPREDIS_DSN=redis+phpredis://redis - GPS_DSN=gps:?projectId=mqdev&emulatorHost=http://google-pubsub:8085 - SQS_DSN=sqs:?key=key&secret=secret®ion=us-east-1&endpoint=http://localstack:4576&version=latest + - WAMP_DSN=wamp://thruway:9090 - REDIS_HOST=redis - REDIS_PORT=6379 - AWS_SQS_KEY=key @@ -104,6 +106,17 @@ services: ports: - "27017:27017" + thruway: + image: formapro/nginx-php-fpm:latest-all-exts + ports: + - '9090:9090' + working_dir: '/app' + volumes: + - './:/app' + entrypoint: + - '/usr/bin/php' + - 'vendor/voryx/thruway/Examples/SimpleWsRouter.php' + localstack: image: 'localstack/localstack:latest' ports: diff --git a/docker/bin/test.sh b/docker/bin/test.sh index da57c62d9..35caf838a 100755 --- a/docker/bin/test.sh +++ b/docker/bin/test.sh @@ -37,6 +37,7 @@ waitForService beanstalkd 11300 50 waitForService gearmand 4730 50 waitForService kafka 9092 50 waitForService mongo 27017 50 +waitForService thruway 9090 50 waitForService localstack 4576 50 php docker/bin/refresh-mysql-database.php From 74c133937240aaa3c1cac29719d15a3d651a5891 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 23 Oct 2018 15:37:42 +0300 Subject: [PATCH 12/13] wamp --- docker-compose.yml | 2 +- docker/thruway/WsRouter.php | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 docker/thruway/WsRouter.php diff --git a/docker-compose.yml b/docker-compose.yml index a909f2746..1335b75c4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -115,7 +115,7 @@ services: - './:/app' entrypoint: - '/usr/bin/php' - - 'vendor/voryx/thruway/Examples/SimpleWsRouter.php' + - 'docker/thruway/WsRouter.php' localstack: image: 'localstack/localstack:latest' diff --git a/docker/thruway/WsRouter.php b/docker/thruway/WsRouter.php new file mode 100644 index 000000000..90cad1100 --- /dev/null +++ b/docker/thruway/WsRouter.php @@ -0,0 +1,14 @@ +addTransportProvider($transportProvider); + +$router->start(); From 3ebdb431be9c28350f73052134ebf0875589af30 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 23 Oct 2018 19:14:52 +0300 Subject: [PATCH 13/13] wamp --- docker/thruway/WsRouter.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/thruway/WsRouter.php b/docker/thruway/WsRouter.php index 90cad1100..36b7eb0dc 100644 --- a/docker/thruway/WsRouter.php +++ b/docker/thruway/WsRouter.php @@ -1,6 +1,6 @@