From 2db7fe8c61246a8a543c86a7b2d43b04e0b280a8 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 23 Oct 2018 13:33:41 +0300 Subject: [PATCH] wamp --- docs/transport/wamp.md | 30 +++++++++++++++++++++++++++++ phpunit.xml.dist | 4 ++++ pkg/enqueue/Resources.php | 2 +- pkg/enqueue/Tests/ResourcesTest.php | 19 ++++++++++++++++++ pkg/wamp/WampConnectionFactory.php | 2 +- 5 files changed, 55 insertions(+), 2 deletions(-) diff --git a/docs/transport/wamp.md b/docs/transport/wamp.md index b94add54a..dcb995d00 100644 --- a/docs/transport/wamp.md +++ b/docs/transport/wamp.md @@ -8,6 +8,7 @@ It uses internally Thruway PHP library [voryx/thruway](https://github.com/voryx/ * [Start the WAMP router](#start-the-wamp-router) * [Create context](#create-context) * [Consume message](#consume-message) +* [Subscription consumer](#subscription-consumer) * [Send message to topic](#send-message-to-topic) ## Installation @@ -58,6 +59,35 @@ while (true) { } ``` +## Subscription consumer + +```php +createConsumer($fooQueue); +$barConsumer = $context->createConsumer($barQueue); + +$subscriptionConsumer = $context->createSubscriptionConsumer(); +$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) { + // process message + + return true; +}); +$subscriptionConsumer->subscribe($barConsumer, function(Message $message, Consumer $consumer) { + // process message + + return true; +}); + +$subscriptionConsumer->consume(2000); // 2 sec +``` + ## Send message to topic ```php diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 10afc56e7..f1c8f205f 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -104,6 +104,10 @@ pkg/dsn/Tests + + + pkg/wamp/Tests + diff --git a/pkg/enqueue/Resources.php b/pkg/enqueue/Resources.php index 3232ba013..508dc2694 100644 --- a/pkg/enqueue/Resources.php +++ b/pkg/enqueue/Resources.php @@ -165,7 +165,7 @@ public static function getKnownConnections(): array 'package' => 'enqueue/mongodb', ]; $map[WampConnectionFactory::class] = [ - 'schemes' => ['wamp'], + 'schemes' => ['wamp', 'ws'], 'supportedSchemeExtensions' => [], 'package' => 'enqueue/wamp', ]; diff --git a/pkg/enqueue/Tests/ResourcesTest.php b/pkg/enqueue/Tests/ResourcesTest.php index 52bd13f9f..ed36236a7 100644 --- a/pkg/enqueue/Tests/ResourcesTest.php +++ b/pkg/enqueue/Tests/ResourcesTest.php @@ -4,6 +4,7 @@ use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Resources; +use Enqueue\Wamp\WampConnectionFactory; use Interop\Queue\ConnectionFactory; use PHPUnit\Framework\TestCase; @@ -127,4 +128,22 @@ public function testShouldAllowGetPreviouslyRegisteredConnection() $this->assertArrayHasKey('package', $connectionInfo); $this->assertSame('foo/bar', $connectionInfo['package']); } + + public function testShouldHaveRegisteredWampConfiguration() + { + $availableConnections = Resources::getKnownConnections(); + + $this->assertInternalType('array', $availableConnections); + $this->assertArrayHasKey(WampConnectionFactory::class, $availableConnections); + + $connectionInfo = $availableConnections[WampConnectionFactory::class]; + $this->assertArrayHasKey('schemes', $connectionInfo); + $this->assertSame(['wamp', 'ws'], $connectionInfo['schemes']); + + $this->assertArrayHasKey('supportedSchemeExtensions', $connectionInfo); + $this->assertSame([], $connectionInfo['supportedSchemeExtensions']); + + $this->assertArrayHasKey('package', $connectionInfo); + $this->assertSame('enqueue/wamp', $connectionInfo['package']); + } } diff --git a/pkg/wamp/WampConnectionFactory.php b/pkg/wamp/WampConnectionFactory.php index e810ab052..c56635170 100644 --- a/pkg/wamp/WampConnectionFactory.php +++ b/pkg/wamp/WampConnectionFactory.php @@ -93,7 +93,7 @@ private function parseDsn(string $dsn): array { $dsn = new Dsn($dsn); - if ('wamp' !== $dsn->getSchemeProtocol()) { + if (false === in_array($dsn->getSchemeProtocol(), ['wamp', 'ws'], true)) { throw new \LogicException(sprintf( 'The given scheme protocol "%s" is not supported. It must be "wamp"', $dsn->getSchemeProtocol()