diff --git a/bin/subtree-split b/bin/subtree-split index 222574f13..f7d63eb34 100755 --- a/bin/subtree-split +++ b/bin/subtree-split @@ -66,6 +66,7 @@ remote async-event-dispatcher git@github.com:php-enqueue/async-event-dispatcher. remote async-command git@github.com:php-enqueue/async-command.git remote mongodb git@github.com:php-enqueue/mongodb.git remote dsn git@github.com:php-enqueue/dsn.git +remote wamp git@github.com:php-enqueue/wamp.git split 'pkg/enqueue' enqueue split 'pkg/simple-client' simple-client @@ -90,3 +91,4 @@ split 'pkg/async-event-dispatcher' async-event-dispatcher split 'pkg/async-command' async-command split 'pkg/mongodb' mongodb split 'pkg/dsn' dsn +split 'pkg/wamp' wamp diff --git a/composer.json b/composer.json index 0099b6cac..51ddb18ea 100644 --- a/composer.json +++ b/composer.json @@ -30,7 +30,9 @@ "php-http/guzzle6-adapter": "^1.1", "php-http/client-common": "^1.7@dev", "richardfullmer/rabbitmq-management-api": "^2.0", - "predis/predis": "^1.1" + "predis/predis": "^1.1", + "thruway/pawl-transport": "^0.5.0", + "voryx/thruway": "^0.5.3" }, "require-dev": { "phpunit/phpunit": "^5.5", @@ -75,6 +77,7 @@ "Enqueue\\Stomp\\": "pkg/stomp/", "Enqueue\\Test\\": "pkg/test/", "Enqueue\\Dsn\\": "pkg/dsn/", + "Enqueue\\Wamp\\": "pkg/wamp/", "Enqueue\\": "pkg/enqueue/" }, "exclude-from-classmap": [ diff --git a/docker-compose.yml b/docker-compose.yml index 60f7082d9..1335b75c4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,7 @@ services: - google-pubsub - rabbitmqssl - mongo + - thruway - localstack volumes: - './:/mqdev' @@ -30,6 +31,7 @@ services: - PHPREDIS_DSN=redis+phpredis://redis - GPS_DSN=gps:?projectId=mqdev&emulatorHost=http://google-pubsub:8085 - SQS_DSN=sqs:?key=key&secret=secret®ion=us-east-1&endpoint=http://localstack:4576&version=latest + - WAMP_DSN=wamp://thruway:9090 - REDIS_HOST=redis - REDIS_PORT=6379 - AWS_SQS_KEY=key @@ -104,6 +106,17 @@ services: ports: - "27017:27017" + thruway: + image: formapro/nginx-php-fpm:latest-all-exts + ports: + - '9090:9090' + working_dir: '/app' + volumes: + - './:/app' + entrypoint: + - '/usr/bin/php' + - 'docker/thruway/WsRouter.php' + localstack: image: 'localstack/localstack:latest' ports: diff --git a/docker/bin/test.sh b/docker/bin/test.sh index da57c62d9..35caf838a 100755 --- a/docker/bin/test.sh +++ b/docker/bin/test.sh @@ -37,6 +37,7 @@ waitForService beanstalkd 11300 50 waitForService gearmand 4730 50 waitForService kafka 9092 50 waitForService mongo 27017 50 +waitForService thruway 9090 50 waitForService localstack 4576 50 php docker/bin/refresh-mysql-database.php diff --git a/docker/thruway/WsRouter.php b/docker/thruway/WsRouter.php new file mode 100644 index 000000000..36b7eb0dc --- /dev/null +++ b/docker/thruway/WsRouter.php @@ -0,0 +1,14 @@ +addTransportProvider($transportProvider); + +$router->start(); diff --git a/docs/index.md b/docs/index.md index 32580b53a..fe57d473f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -10,6 +10,7 @@ - [Kafka](transport/kafka.md) - [Stomp](transport/stomp.md) - [Redis](transport/redis.md) + - [Wamp](transport/wamp.md) - [Doctrine DBAL](transport/dbal.md) - [Filesystem](transport/filesystem.md) - [Null](transport/null.md) diff --git a/docs/transport/wamp.md b/docs/transport/wamp.md new file mode 100644 index 000000000..dcb995d00 --- /dev/null +++ b/docs/transport/wamp.md @@ -0,0 +1,103 @@ +# Web Application Messaging Protocol (WAMP) Transport + +A transport for [Web Application Messaging Protocol](https://wamp-proto.org/). +WAMP is an open standard WebSocket subprotocol. +It uses internally Thruway PHP library [voryx/thruway](https://github.com/voryx/Thruway) + +* [Installation](#installation) +* [Start the WAMP router](#start-the-wamp-router) +* [Create context](#create-context) +* [Consume message](#consume-message) +* [Subscription consumer](#subscription-consumer) +* [Send message to topic](#send-message-to-topic) + +## Installation + +```bash +$ composer require enqueue/wamp +``` + +## Start the WAMP router + +```bash +$ php vendor/voryx/thruway/Examples/SimpleWsRouter.php +``` + +Thruway is now running on 127.0.0.1 port 9090 + + +## Create context + +```php +createContext(); + +// if you have enqueue/enqueue library installed you can use a factory to build context from DSN +$context = (new \Enqueue\ConnectionFactoryFactory())->create('wamp:')->createContext(); +``` + +## Consume message: + +Start message consumer before send message to the topic + +```php +createTopic('foo'); + +$consumer = $context->createConsumer($fooQueue); + +while (true) { + if ($message = $consumer->receive()) { + // process a message + } +} +``` + +## Subscription consumer + +```php +createConsumer($fooQueue); +$barConsumer = $context->createConsumer($barQueue); + +$subscriptionConsumer = $context->createSubscriptionConsumer(); +$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) { + // process message + + return true; +}); +$subscriptionConsumer->subscribe($barConsumer, function(Message $message, Consumer $consumer) { + // process message + + return true; +}); + +$subscriptionConsumer->consume(2000); // 2 sec +``` + +## Send message to topic + +```php +createTopic('foo'); +$message = $context->createMessage('Hello world!'); + +$context->createProducer()->send($fooTopic, $message); +``` + +[back to index](../index.md) \ No newline at end of file diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 10afc56e7..f1c8f205f 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -104,6 +104,10 @@ pkg/dsn/Tests + + + pkg/wamp/Tests + diff --git a/pkg/enqueue/Resources.php b/pkg/enqueue/Resources.php index 21172c636..508dc2694 100644 --- a/pkg/enqueue/Resources.php +++ b/pkg/enqueue/Resources.php @@ -16,6 +16,7 @@ use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Sqs\SqsConnectionFactory; use Enqueue\Stomp\StompConnectionFactory; +use Enqueue\Wamp\WampConnectionFactory; use Interop\Queue\ConnectionFactory; final class Resources @@ -163,6 +164,11 @@ public static function getKnownConnections(): array 'supportedSchemeExtensions' => [], 'package' => 'enqueue/mongodb', ]; + $map[WampConnectionFactory::class] = [ + 'schemes' => ['wamp', 'ws'], + 'supportedSchemeExtensions' => [], + 'package' => 'enqueue/wamp', + ]; self::$knownConnections = $map; } diff --git a/pkg/enqueue/Tests/ResourcesTest.php b/pkg/enqueue/Tests/ResourcesTest.php index 52bd13f9f..ed36236a7 100644 --- a/pkg/enqueue/Tests/ResourcesTest.php +++ b/pkg/enqueue/Tests/ResourcesTest.php @@ -4,6 +4,7 @@ use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Resources; +use Enqueue\Wamp\WampConnectionFactory; use Interop\Queue\ConnectionFactory; use PHPUnit\Framework\TestCase; @@ -127,4 +128,22 @@ public function testShouldAllowGetPreviouslyRegisteredConnection() $this->assertArrayHasKey('package', $connectionInfo); $this->assertSame('foo/bar', $connectionInfo['package']); } + + public function testShouldHaveRegisteredWampConfiguration() + { + $availableConnections = Resources::getKnownConnections(); + + $this->assertInternalType('array', $availableConnections); + $this->assertArrayHasKey(WampConnectionFactory::class, $availableConnections); + + $connectionInfo = $availableConnections[WampConnectionFactory::class]; + $this->assertArrayHasKey('schemes', $connectionInfo); + $this->assertSame(['wamp', 'ws'], $connectionInfo['schemes']); + + $this->assertArrayHasKey('supportedSchemeExtensions', $connectionInfo); + $this->assertSame([], $connectionInfo['supportedSchemeExtensions']); + + $this->assertArrayHasKey('package', $connectionInfo); + $this->assertSame('enqueue/wamp', $connectionInfo['package']); + } } diff --git a/pkg/test/WampExtension.php b/pkg/test/WampExtension.php new file mode 100644 index 000000000..eab5d42b7 --- /dev/null +++ b/pkg/test/WampExtension.php @@ -0,0 +1,18 @@ +createContext(); + } +} diff --git a/pkg/wamp/.gitignore b/pkg/wamp/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/wamp/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/wamp/.travis.yml b/pkg/wamp/.travis.yml new file mode 100644 index 000000000..b7ba11943 --- /dev/null +++ b/pkg/wamp/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +git: + depth: 10 + +language: php + +php: + - '7.1' + - '7.2' + +cache: + directories: + - $HOME/.composer/cache + +install: + - composer self-update + - composer install + +script: + - vendor/bin/phpunit --exclude-group=functional diff --git a/pkg/wamp/JsonSerializer.php b/pkg/wamp/JsonSerializer.php new file mode 100644 index 000000000..b6027ba81 --- /dev/null +++ b/pkg/wamp/JsonSerializer.php @@ -0,0 +1,41 @@ + $message->getBody(), + 'properties' => $message->getProperties(), + 'headers' => $message->getHeaders(), + ]); + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return $json; + } + + public function toMessage(string $string): WampMessage + { + $data = json_decode($string, true); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return new WampMessage($data['body'], $data['properties'], $data['headers']); + } +} diff --git a/pkg/wamp/LICENSE b/pkg/wamp/LICENSE new file mode 100644 index 000000000..7afbaa1ff --- /dev/null +++ b/pkg/wamp/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) +Copyright (c) 2018 Forma-Pro + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/pkg/wamp/README.md b/pkg/wamp/README.md new file mode 100644 index 000000000..6838a366c --- /dev/null +++ b/pkg/wamp/README.md @@ -0,0 +1,27 @@ +# Web Application Messaging Protocol (WAMP) Transport + +[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) +[![Build Status](https://travis-ci.org/php-enqueue/wamp.png?branch=master)](https://travis-ci.org/php-enqueue/wamp) +[![Total Downloads](https://poser.pugx.org/enqueue/wamp/d/total.png)](https://packagist.org/packages/enqueue/wamp) +[![Latest Stable Version](https://poser.pugx.org/enqueue/wamp/version.png)](https://packagist.org/packages/enqueue/wamp) + +This is an implementation of [queue interop](https://github.com/queue-interop/queue-interop). It uses [Thruway](https://github.com/voryx/Thruway) internally. + +## Resources + +* [Site](https://enqueue.forma-pro.com/) +* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) +* [Questions](https://gitter.im/php-enqueue/Lobby) +* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) + +## Developed by Forma-Pro + +Forma-Pro is a full stack development company which interests also spread to open source development. +Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. +Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability. + +If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com + +## License + +It is released under the [MIT License](LICENSE). \ No newline at end of file diff --git a/pkg/wamp/Serializer.php b/pkg/wamp/Serializer.php new file mode 100644 index 000000000..414fcf414 --- /dev/null +++ b/pkg/wamp/Serializer.php @@ -0,0 +1,12 @@ +serializer = $serializer; + } + + /** + * @return Serializer + */ + public function getSerializer() + { + return $this->serializer; + } +} diff --git a/pkg/wamp/Tests/Functional/WampConsumerTest.php b/pkg/wamp/Tests/Functional/WampConsumerTest.php new file mode 100644 index 000000000..e9d3fca42 --- /dev/null +++ b/pkg/wamp/Tests/Functional/WampConsumerTest.php @@ -0,0 +1,65 @@ +buildWampContext(); + $topic = $context->createTopic('topic'); + $consumer = $context->createConsumer($topic); + $producer = $context->createProducer(); + $message = $context->createMessage('the body'); + + // init client + $consumer->receive(1); + + $consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) { + $producer->send($topic, $message); + }); + + $receivedMessage = $consumer->receive(100); + + $this->assertInstanceOf(WampMessage::class, $receivedMessage); + $this->assertSame('the body', $receivedMessage->getBody()); + } + + public function testShouldSendAndReceiveNoWaitMessage() + { + $context = $this->buildWampContext(); + $topic = $context->createTopic('topic'); + $consumer = $context->createConsumer($topic); + $producer = $context->createProducer(); + $message = $context->createMessage('the body'); + + // init client + $consumer->receive(1); + + $consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) { + $producer->send($topic, $message); + }); + + $receivedMessage = $consumer->receiveNoWait(); + + $this->assertInstanceOf(WampMessage::class, $receivedMessage); + $this->assertSame('the body', $receivedMessage->getBody()); + } +} diff --git a/pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php b/pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php new file mode 100644 index 000000000..c28b3e0a5 --- /dev/null +++ b/pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php @@ -0,0 +1,51 @@ +buildWampContext(); + $topic = $context->createTopic('topic'); + $consumer = $context->createSubscriptionConsumer(); + $producer = $context->createProducer(); + $message = $context->createMessage('the body'); + + $receivedMessage = null; + $consumer->subscribe($context->createConsumer($topic), function ($message) use (&$receivedMessage) { + $receivedMessage = $message; + + return false; + }); + + // init client + $consumer->consume(1); + + $consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) { + $producer->send($topic, $message); + }); + + $consumer->consume(100); + + $this->assertInstanceOf(WampMessage::class, $receivedMessage); + $this->assertSame('the body', $receivedMessage->getBody()); + } +} diff --git a/pkg/wamp/Tests/Spec/JsonSerializerTest.php b/pkg/wamp/Tests/Spec/JsonSerializerTest.php new file mode 100644 index 000000000..4bc49c599 --- /dev/null +++ b/pkg/wamp/Tests/Spec/JsonSerializerTest.php @@ -0,0 +1,76 @@ +assertClassImplements(Serializer::class, JsonSerializer::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new JsonSerializer(); + } + + public function testShouldConvertMessageToJsonString() + { + $serializer = new JsonSerializer(); + + $message = new WampMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); + + $json = $serializer->toString($message); + + $this->assertSame('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}', $json); + } + + public function testThrowIfFailedToEncodeMessageToJson() + { + $serializer = new JsonSerializer(); + + $resource = fopen(__FILE__, 'rb'); + + //guard + $this->assertInternalType('resource', $resource); + + $message = new WampMessage('theBody', ['aProp' => $resource]); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given.'); + $serializer->toString($message); + } + + public function testShouldConvertJsonStringToMessage() + { + $serializer = new JsonSerializer(); + + $message = $serializer->toMessage('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}'); + + $this->assertInstanceOf(WampMessage::class, $message); + + $this->assertSame('theBody', $message->getBody()); + $this->assertSame(['aProp' => 'aPropVal'], $message->getProperties()); + $this->assertSame(['aHeader' => 'aHeaderVal'], $message->getHeaders()); + } + + public function testThrowIfFailedToDecodeJsonToMessage() + { + $serializer = new JsonSerializer(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given.'); + $serializer->toMessage('{]'); + } +} diff --git a/pkg/wamp/Tests/Spec/WampConnectionFactoryTest.php b/pkg/wamp/Tests/Spec/WampConnectionFactoryTest.php new file mode 100644 index 000000000..3c3ccf841 --- /dev/null +++ b/pkg/wamp/Tests/Spec/WampConnectionFactoryTest.php @@ -0,0 +1,20 @@ +buildWampContext(); + } +} diff --git a/pkg/wamp/Tests/Spec/WampMessageTest.php b/pkg/wamp/Tests/Spec/WampMessageTest.php new file mode 100644 index 000000000..5482fadde --- /dev/null +++ b/pkg/wamp/Tests/Spec/WampMessageTest.php @@ -0,0 +1,20 @@ +buildWampContext()->createProducer(); + } +} diff --git a/pkg/wamp/Tests/Spec/WampQueueTest.php b/pkg/wamp/Tests/Spec/WampQueueTest.php new file mode 100644 index 000000000..e8cb87267 --- /dev/null +++ b/pkg/wamp/Tests/Spec/WampQueueTest.php @@ -0,0 +1,20 @@ + true, + * 'dsn' => 'wamp://127.0.0.1:9090', + * 'host' => '127.0.0.1', + * 'port' => '9090', + * 'max_retries' => 15, + * 'initial_retry_delay' => 1.5, + * 'max_retry_delay' => 300, + * 'retry_delay_growth' => 1.5, + * ] + * + * or + * + * wamp://127.0.0.1:9090?max_retries=10 + * + * @param array|string|null $config + */ + public function __construct($config = 'wamp:') + { + if (empty($config)) { + $config = $this->parseDsn('wamp:'); + } elseif (is_string($config)) { + $config = $this->parseDsn($config); + } elseif (is_array($config)) { + $config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']); + } else { + throw new \LogicException('The config must be either an array of options, a DSN string or null'); + } + + $config = array_replace([ + 'lazy' => true, + 'host' => '127.0.0.1', + 'port' => '9090', + 'max_retries' => 15, + 'initial_retry_delay' => 1.5, + 'max_retry_delay' => 300, + 'retry_delay_growth' => 1.5, + ], $config); + + $this->config = $config; + } + + public function createContext(): Context + { + if ($this->config['lazy']) { + return new WampContext(function () { + return $this->establishConnection(); + }); + } + + return new WampContext($this->establishConnection()); + } + + private function establishConnection(): Client + { + $uri = sprintf('ws://%s:%s', $this->config['host'], $this->config['port']); + + $client = new Client('realm1'); + $client->addTransportProvider(new PawlTransportProvider($uri)); + $client->setReconnectOptions([ + 'max_retries' => $this->config['max_retries'], + 'initial_retry_delay' => $this->config['initial_retry_delay'], + 'max_retry_delay' => $this->config['max_retry_delay'], + 'retry_delay_growth' => $this->config['retry_delay_growth'], + ]); + + return $client; + } + + private function parseDsn(string $dsn): array + { + $dsn = new Dsn($dsn); + + if (false === in_array($dsn->getSchemeProtocol(), ['wamp', 'ws'], true)) { + throw new \LogicException(sprintf( + 'The given scheme protocol "%s" is not supported. It must be "wamp"', + $dsn->getSchemeProtocol() + )); + } + + return array_filter(array_replace($dsn->getQuery(), [ + 'host' => $dsn->getHost(), + 'port' => $dsn->getPort(), + 'max_retries' => $dsn->getInt('max_retries'), + 'initial_retry_delay' => $dsn->getFloat('initial_retry_delay'), + 'max_retry_delay' => $dsn->getInt('max_retry_delay'), + 'retry_delay_growth' => $dsn->getFloat('retry_delay_growth'), + ]), function ($value) { return null !== $value; }); + } +} diff --git a/pkg/wamp/WampConsumer.php b/pkg/wamp/WampConsumer.php new file mode 100644 index 000000000..8dc859739 --- /dev/null +++ b/pkg/wamp/WampConsumer.php @@ -0,0 +1,139 @@ +context = $context; + $this->queue = $destination; + } + + public function getQueue(): Queue + { + return $this->queue; + } + + public function getClient(): ?Client + { + return $this->client; + } + + public function receive(int $timeout = 0): ?Message + { + $init = false; + $this->timer = null; + $this->message = null; + + if (null === $this->client) { + $init = true; + + $this->client = $this->context->getNewClient(); + $this->client->setAttemptRetry(true); + $this->client->on('open', function (ClientSession $session) { + $session->subscribe($this->queue->getQueueName(), function ($args) { + $this->message = $this->context->getSerializer()->toMessage($args[0]); + + $this->client->emit('do-stop'); + }); + }); + + $this->client->on('do-stop', function () { + if ($this->timer) { + $this->client->getLoop()->cancelTimer($this->timer); + } + + $this->client->getLoop()->stop(); + }); + } + + if ($timeout > 0) { + $timeout = $timeout / 1000; + $timeout = $timeout >= 0.1 ? $timeout : 0.1; + + $this->timer = $this->client->getLoop()->addTimer($timeout, function () { + $this->client->emit('do-stop'); + }); + } + + if ($init) { + $this->client->start(false); + } + + $this->client->getLoop()->run(); + + $message = $this->message; + + $this->timer = null; + $this->message = null; + + return $message; + } + + public function receiveNoWait(): ?Message + { + return $this->receive(100); + } + + /** + * {@inheritdoc} + * + * @param WampMessage $message + */ + public function acknowledge(Message $message): void + { + // do nothing. wamp transport always works in auto ack mode + } + + /** + * {@inheritdoc} + * + * @param WampMessage $message + */ + public function reject(Message $message, bool $requeue = false): void + { + InvalidMessageException::assertMessageInstanceOf($message, WampMessage::class); + + // do nothing on reject. wamp transport always works in auto ack mode + + if ($requeue) { + $this->context->createProducer()->send($this->queue, $message); + } + } +} diff --git a/pkg/wamp/WampContext.php b/pkg/wamp/WampContext.php new file mode 100644 index 000000000..9bbed5087 --- /dev/null +++ b/pkg/wamp/WampContext.php @@ -0,0 +1,111 @@ +clientFactory = $clientFactory; + + $this->setSerializer(new JsonSerializer()); + } + + public function createMessage(string $body = '', array $properties = [], array $headers = []): Message + { + return new WampMessage($body, $properties, $headers); + } + + public function createTopic(string $topicName): Topic + { + return new WampDestination($topicName); + } + + public function createQueue(string $queueName): Queue + { + return new WampDestination($queueName); + } + + public function createTemporaryQueue(): Queue + { + throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt(); + } + + public function createProducer(): Producer + { + return new WampProducer($this); + } + + public function createConsumer(Destination $destination): Consumer + { + InvalidDestinationException::assertDestinationInstanceOf($destination, WampDestination::class); + + return new WampConsumer($this, $destination); + } + + public function createSubscriptionConsumer(): SubscriptionConsumer + { + return new WampSubscriptionConsumer($this); + } + + public function purgeQueue(Queue $queue): void + { + throw PurgeQueueNotSupportedException::providerDoestNotSupportIt(); + } + + public function close(): void + { + foreach ($this->clients as $client) { + if (null === $client->getSession()) { + return; + } + + $client->setAttemptRetry(false); + $client->getSession()->close(); + } + } + + public function getNewClient(): Client + { + $client = call_user_func($this->clientFactory); + + if (false == $client instanceof Client) { + throw new \LogicException(sprintf( + 'The factory must return instance of "%s". But it returns %s', + Client::class, + is_object($client) ? get_class($client) : gettype($client) + )); + } + + $this->clients[] = $client; + + return $client; + } +} diff --git a/pkg/wamp/WampDestination.php b/pkg/wamp/WampDestination.php new file mode 100644 index 000000000..a99bc1fa0 --- /dev/null +++ b/pkg/wamp/WampDestination.php @@ -0,0 +1,31 @@ +name = $name; + } + + public function getQueueName(): string + { + return $this->name; + } + + public function getTopicName(): string + { + return $this->name; + } +} diff --git a/pkg/wamp/WampMessage.php b/pkg/wamp/WampMessage.php new file mode 100644 index 000000000..9ad41f5a4 --- /dev/null +++ b/pkg/wamp/WampMessage.php @@ -0,0 +1,21 @@ +body = $body; + $this->properties = $properties; + $this->headers = $headers; + $this->redelivered = false; + } +} diff --git a/pkg/wamp/WampProducer.php b/pkg/wamp/WampProducer.php new file mode 100644 index 000000000..3bffe21f0 --- /dev/null +++ b/pkg/wamp/WampProducer.php @@ -0,0 +1,190 @@ +context = $context; + } + + /** + * {@inheritdoc} + * + * @param WampDestination $destination + * @param WampMessage $message + */ + public function send(Destination $destination, Message $message): void + { + InvalidDestinationException::assertDestinationInstanceOf($destination, WampDestination::class); + InvalidMessageException::assertMessageInstanceOf($message, WampMessage::class); + + $init = false; + $this->message = $message; + $this->destination = $destination; + + if (null === $this->client) { + $init = true; + + $this->client = $this->context->getNewClient(); + $this->client->setAttemptRetry(true); + $this->client->on('open', function (ClientSession $session) { + $this->session = $session; + + $this->doSendMessageIfPossible(); + }); + + $this->client->on('close', function () { + if ($this->session === $this->client->getSession()) { + $this->session = null; + } + }); + + $this->client->on('error', function () { + if ($this->session === $this->client->getSession()) { + $this->session = null; + } + }); + + $this->client->on('do-send', function (WampDestination $destination, WampMessage $message) { + $onFinish = function () { + $this->client->emit('do-stop'); + }; + + $payload = $this->context->getSerializer()->toString($message); + + $this->session->publish($destination->getTopicName(), [$payload], [], ['acknowledge' => true]) + ->then($onFinish, $onFinish); + }); + + $this->client->on('do-stop', function () { + $this->client->getLoop()->stop(); + }); + } + + $this->client->getLoop()->futureTick(function () { + $this->doSendMessageIfPossible(); + }); + + if ($init) { + $this->client->start(false); + } + + $this->client->getLoop()->run(); + } + + /** + * {@inheritdoc} + * + * @return WampProducer + */ + public function setDeliveryDelay(int $deliveryDelay = null): Producer + { + if (null === $deliveryDelay) { + return $this; + } + + throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + } + + public function getDeliveryDelay(): ?int + { + return null; + } + + /** + * {@inheritdoc} + * + * @return WampProducer + */ + public function setPriority(int $priority = null): Producer + { + if (null === $priority) { + return $this; + } + + throw PriorityNotSupportedException::providerDoestNotSupportIt(); + } + + public function getPriority(): ?int + { + return null; + } + + /** + * {@inheritdoc} + * + * @return WampProducer + */ + public function setTimeToLive(int $timeToLive = null): Producer + { + if (null === $timeToLive) { + return $this; + } + + throw TimeToLiveNotSupportedException::providerDoestNotSupportIt(); + } + + public function getTimeToLive(): ?int + { + return null; + } + + private function doSendMessageIfPossible() + { + if (null === $this->session) { + return; + } + + if (null === $this->message) { + return; + } + + $message = $this->message; + $destination = $this->destination; + + $this->message = null; + $this->destination = null; + + $this->client->emit('do-send', [$destination, $message]); + } +} diff --git a/pkg/wamp/WampSubscriptionConsumer.php b/pkg/wamp/WampSubscriptionConsumer.php new file mode 100644 index 000000000..6b96926e1 --- /dev/null +++ b/pkg/wamp/WampSubscriptionConsumer.php @@ -0,0 +1,165 @@ +context = $context; + $this->subscribers = []; + } + + public function getClient(): ?Client + { + return $this->client; + } + + public function consume(int $timeout = 0): void + { + if (empty($this->subscribers)) { + throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); + } + + $init = false; + + if (null === $this->client) { + $init = true; + + $this->client = $this->context->getNewClient(); + $this->client->setAttemptRetry(true); + $this->client->on('open', function (ClientSession $session) { + foreach ($this->subscribers as $queue => $subscriber) { + $session->subscribe($queue, function ($args) use ($subscriber) { + $message = $this->context->getSerializer()->toMessage($args[0]); + + /** @var WampConsumer $consumer */ + /** @var callable $callback */ + list($consumer, $callback) = $subscriber; + + if (false === call_user_func($callback, $message, $consumer)) { + $this->client->emit('do-stop'); + } + }); + } + }); + + $this->client->on('do-stop', function () { + if ($this->timer) { + $this->client->getLoop()->cancelTimer($this->timer); + } + + $this->client->getLoop()->stop(); + }); + } + + if ($timeout > 0) { + $timeout = $timeout / 1000; + $timeout = $timeout >= 0.1 ? $timeout : 0.1; + + $this->timer = $this->client->getLoop()->addTimer($timeout, function () { + $this->client->emit('do-stop'); + }); + } + + if ($init) { + $this->client->start(false); + } + + $this->client->getLoop()->run(); + } + + /** + * {@inheritdoc} + * + * @param WampConsumer $consumer + */ + public function subscribe(Consumer $consumer, callable $callback): void + { + if (false == $consumer instanceof WampConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', WampConsumer::class, get_class($consumer))); + } + + if ($this->client) { + throw new \LogicException('Could not subscribe after consume was called'); + } + + $queueName = $consumer->getQueue()->getQueueName(); + if (array_key_exists($queueName, $this->subscribers)) { + if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) { + return; + } + + throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName)); + } + + $this->subscribers[$queueName] = [$consumer, $callback]; + } + + /** + * {@inheritdoc} + * + * @param WampConsumer $consumer + */ + public function unsubscribe(Consumer $consumer): void + { + if (false == $consumer instanceof WampConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', WampConsumer::class, get_class($consumer))); + } + + if ($this->client) { + throw new \LogicException('Could not unsubscribe after consume was called'); + } + + $queueName = $consumer->getQueue()->getQueueName(); + + if (false == array_key_exists($queueName, $this->subscribers)) { + return; + } + + if ($this->subscribers[$queueName][0] !== $consumer) { + return; + } + + unset($this->subscribers[$queueName]); + } + + public function unsubscribeAll(): void + { + if ($this->client) { + throw new \LogicException('Could not unsubscribe after consume was called'); + } + + $this->subscribers = []; + } +} diff --git a/pkg/wamp/composer.json b/pkg/wamp/composer.json new file mode 100644 index 000000000..edb1bcc86 --- /dev/null +++ b/pkg/wamp/composer.json @@ -0,0 +1,40 @@ +{ + "name": "enqueue/wamp", + "type": "library", + "description": "The Web Application Messaging Protocol Transport", + "keywords": ["messaging", "queue", "wamp", "thruway"], + "homepage": "https://enqueue.forma-pro.com/", + "license": "MIT", + "require": { + "php": "^7.1.3", + "queue-interop/queue-interop": "0.7.x-dev", + "enqueue/dsn": "0.9.x-dev", + "thruway/pawl-transport": "^0.5.0", + "voryx/thruway": "^0.5.3" + }, + "require-dev": { + "phpunit/phpunit": "~5.4.0", + "enqueue/test": "0.9.x-dev", + "enqueue/null": "0.9.x-dev", + "queue-interop/queue-spec": "0.6.x-dev" + }, + "support": { + "email": "opensource@forma-pro.com", + "issues": "https://github.com/php-enqueue/enqueue-dev/issues", + "forum": "https://gitter.im/php-enqueue/Lobby", + "source": "https://github.com/php-enqueue/enqueue-dev", + "docs": "https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md" + }, + "autoload": { + "psr-4": { "Enqueue\\Wamp\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.9.x-dev" + } + } +} diff --git a/pkg/wamp/phpunit.xml.dist b/pkg/wamp/phpunit.xml.dist new file mode 100644 index 000000000..717a3c6db --- /dev/null +++ b/pkg/wamp/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Tests + + + +