From b75d2cee663dd29d3188f3bbdd5c72711c86ebdc Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 11:40:18 +0300 Subject: [PATCH 01/33] delay strategy --- .../RabbitMqAmqpBunnyTransportFactory.php | 19 +++++++++- .../Symfony/RabbitMqAmqpTransportFactory.php | 19 +++++++++- .../RabbitMqAmqpLibTransportFactory.php | 21 +++++++++-- .../DelayStrategyTransportFactoryTrait.php | 37 +++++++++++++++++++ 4 files changed, 89 insertions(+), 7 deletions(-) create mode 100644 pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php diff --git a/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php b/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php index 20de33382..9bf748a99 100644 --- a/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php +++ b/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php @@ -2,6 +2,7 @@ namespace Enqueue\AmqpBunny\Symfony; +use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait; use Enqueue\Client\Amqp\RabbitMqDriver; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -10,6 +11,8 @@ class RabbitMqAmqpBunnyTransportFactory extends AmqpBunnyTransportFactory { + use DelayStrategyTransportFactoryTrait; + /** * @param string $name */ @@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder) $builder ->children() - ->booleanNode('delay_plugin_installed') + ->booleanNode('delay_strategy') ->defaultFalse() - ->info('The option tells whether RabbitMQ broker has delay plugin installed or not') + ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') ->end() ; } + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factoryId = parent::createConnectionFactory($container, $config); + + $this->registerDelayStrategy($container, $config, $factoryId, $this->getName()); + + return $factoryId; + } + /** * {@inheritdoc} */ diff --git a/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php b/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php index 11ef70342..189b5f1c6 100644 --- a/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php +++ b/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php @@ -2,6 +2,7 @@ namespace Enqueue\AmqpExt\Symfony; +use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait; use Enqueue\Client\Amqp\RabbitMqDriver; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -10,6 +11,8 @@ class RabbitMqAmqpTransportFactory extends AmqpTransportFactory { + use DelayStrategyTransportFactoryTrait; + /** * @param string $name */ @@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder) $builder ->children() - ->booleanNode('delay_plugin_installed') + ->booleanNode('delay_strategy') ->defaultFalse() - ->info('The option tells whether RabbitMQ broker has delay plugin installed or not') + ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') ->end() ; } + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factoryId = parent::createConnectionFactory($container, $config); + + $this->registerDelayStrategy($container, $config, $factoryId, $this->getName()); + + return $factoryId; + } + /** * {@inheritdoc} */ diff --git a/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php b/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php index 20765c6b1..a41eb01ef 100644 --- a/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php +++ b/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php @@ -2,6 +2,7 @@ namespace Enqueue\AmqpLib\Symfony; +use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait; use Enqueue\Client\Amqp\RabbitMqDriver; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -10,6 +11,8 @@ class RabbitMqAmqpLibTransportFactory extends AmqpLibTransportFactory { + use DelayStrategyTransportFactoryTrait; + /** * @param string $name */ @@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder) $builder ->children() - ->booleanNode('delay_plugin_installed') - ->defaultFalse() - ->info('The option tells whether RabbitMQ broker has delay plugin installed or not') + ->scalarNode('delay_strategy') + ->defaultNull() + ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') ->end() ; } + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factoryId = parent::createConnectionFactory($container, $config); + + $this->registerDelayStrategy($container, $config, $factoryId, $this->getName()); + + return $factoryId; + } + /** * {@inheritdoc} */ diff --git a/pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php b/pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php new file mode 100644 index 000000000..dbcb71f5a --- /dev/null +++ b/pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php @@ -0,0 +1,37 @@ +getDefinition($factoryId); + + if (false == is_a($factory->getClass(), DelayStrategyAware::class, true)) { + throw new \LogicException('Connection factory does not support delays'); + } + + if (strtolower($config['delay_strategy']) === 'dlx') { + $delayId = sprintf('enqueue.client.%s.delay_strategy', $factoryName); + $container->register($delayId, RabbitMqDlxDelayStrategy::class); + + $factory->addMethodCall('setDelayStrategy', [new Reference($delayId)]); + } elseif (strtolower($config['delay_strategy']) === 'delayed_message_plugin') { + $delayId = sprintf('enqueue.client.%s.delay_strategy', $factoryName); + $container->register($delayId, RabbitMqDelayPluginDelayStrategy::class); + + $factory->addMethodCall('setDelayStrategy', [new Reference($delayId)]); + } else { + $factory->addMethodCall('setDelayStrategy', [new Reference($config['delay_strategy'])]); + } + } + } +} From 0405f4c900bbe402b0878932299c463e387f2a49 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 12:56:48 +0300 Subject: [PATCH 02/33] delay strategy --- pkg/enqueue/Client/Amqp/RabbitMqDriver.php | 64 +++------------------- 1 file changed, 8 insertions(+), 56 deletions(-) diff --git a/pkg/enqueue/Client/Amqp/RabbitMqDriver.php b/pkg/enqueue/Client/Amqp/RabbitMqDriver.php index e65095e87..4b1a26995 100644 --- a/pkg/enqueue/Client/Amqp/RabbitMqDriver.php +++ b/pkg/enqueue/Client/Amqp/RabbitMqDriver.php @@ -75,12 +75,13 @@ public function sendToProcessor(Message $message) $transportMessage = $this->createTransportMessage($message); $destination = $this->createQueue($queueName); + $producer = $this->context->createProducer(); if ($message->getDelay()) { - $destination = $this->createDelayedTopic($destination); + $producer->setDeliveryDelay($message->getDelay() * 1000); } - $this->context->createProducer()->send($destination, $transportMessage); + $producer->send($destination, $transportMessage); } /** @@ -117,11 +118,11 @@ public function createTransportMessage(Message $message) } if ($message->getDelay()) { - if (false == $this->config->getTransportOption('delay_plugin_installed', false)) { - throw new LogicException('The message delaying is not supported. In order to use delay feature install RabbitMQ delay plugin.'); + if (false == $this->config->getTransportOption('delay_strategy', false)) { + throw new LogicException('The message delaying is not supported. In order to use delay feature install RabbitMQ delay strategy.'); } - $transportMessage->setProperty('x-delay', (string) ($message->getDelay() * 1000)); + $transportMessage->setProperty('enqueue.delay', $message->getDelay() * 1000); } return $transportMessage; @@ -144,9 +145,9 @@ public function createClientMessage(PsrMessage $message) $clientMessage->setPriority($clientPriority); } - if ($delay = $message->getProperty('x-delay')) { + if ($delay = $message->getProperty('enqueue.delay')) { if (false == is_numeric($delay)) { - throw new \LogicException(sprintf('x-delay header is not numeric. "%s"', $delay)); + throw new \LogicException(sprintf('"enqueue.delay" header is not numeric. "%s"', $delay)); } $clientMessage->setDelay((int) ((int) $delay) / 1000); @@ -154,53 +155,4 @@ public function createClientMessage(PsrMessage $message) return $clientMessage; } - - /** - * {@inheritdoc} - */ - public function setupBroker(LoggerInterface $logger = null) - { - $logger = $logger ?: new NullLogger(); - - parent::setupBroker($logger); - - $log = function ($text, ...$args) use ($logger) { - $logger->debug(sprintf('[RabbitMqDriver] '.$text, ...$args)); - }; - - // setup delay exchanges - if ($this->config->getTransportOption('delay_plugin_installed', false)) { - foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) { - $queue = $this->createQueue($meta->getClientName()); - - $delayTopic = $this->createDelayedTopic($queue); - - $log('Declare delay exchange: %s', $delayTopic->getTopicName()); - $this->context->declareTopic($delayTopic); - - $log('Bind processor queue to delay exchange: %s -> %s', $queue->getQueueName(), $delayTopic->getTopicName()); - $this->context->bind(new AmqpBind($delayTopic, $queue, $queue->getQueueName())); - } - } - } - - /** - * @param AmqpQueue $queue - * - * @return AmqpTopic - */ - private function createDelayedTopic(AmqpQueue $queue) - { - $queueName = $queue->getQueueName(); - - // in order to use delay feature make sure the rabbitmq_delayed_message_exchange plugin is installed. - $delayTopic = $this->context->createTopic($queueName.'.delayed'); - $delayTopic->setType('x-delayed-message'); - $delayTopic->addFlag(AmqpTopic::FLAG_DURABLE); - $delayTopic->setArguments([ - 'x-delayed-type' => 'direct', - ]); - - return $delayTopic; - } } From 2a9550d1cff85f2f5c7f3b7460f4c6d1cb34a0ad Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 13:15:33 +0300 Subject: [PATCH 03/33] delay strategy --- phpunit.xml.dist | 8 ++++++++ .../Symfony/RabbitMqAmqpBunnyTransportFactory.php | 4 ++-- .../Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php | 8 ++++---- pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php | 4 ++-- .../Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php | 8 ++++---- .../Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php | 8 ++++---- 6 files changed, 24 insertions(+), 16 deletions(-) diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 451898eb0..0fab760d0 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -29,6 +29,14 @@ pkg/amqp-bunny/Tests + + pkg/amqp-lib/Tests + + + + pkg/amqp-tools/Tests + + pkg/fs/Tests diff --git a/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php b/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php index 9bf748a99..c7ccb4f65 100644 --- a/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php +++ b/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php @@ -30,8 +30,8 @@ public function addConfiguration(ArrayNodeDefinition $builder) $builder ->children() - ->booleanNode('delay_strategy') - ->defaultFalse() + ->scalarNode('delay_strategy') + ->defaultNull() ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') ->end() ; diff --git a/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php b/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php index 4dd11871b..24230569e 100644 --- a/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php +++ b/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php @@ -58,7 +58,7 @@ public function testShouldAllowAddConfiguration() 'user' => 'guest', 'pass' => 'guest', 'vhost' => '/', - 'delay_plugin_installed' => false, + 'delay_strategy' => null, 'lazy' => true, 'receive_method' => 'basic_get', 'heartbeat' => 0, @@ -78,7 +78,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertTrue($container->hasDefinition($serviceId)); @@ -91,7 +91,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]], $factory->getArguments()); } @@ -108,7 +108,7 @@ public function testShouldCreateContext() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertEquals('enqueue.transport.rabbitmq_amqp_bunny.context', $serviceId); diff --git a/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php b/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php index 189b5f1c6..6663ac9ec 100644 --- a/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php +++ b/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php @@ -30,8 +30,8 @@ public function addConfiguration(ArrayNodeDefinition $builder) $builder ->children() - ->booleanNode('delay_strategy') - ->defaultFalse() + ->scalarNode('delay_strategy') + ->defaultNull() ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') ->end() ; diff --git a/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php b/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php index 46f530042..cee668049 100644 --- a/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php +++ b/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php @@ -59,7 +59,7 @@ public function testShouldAllowAddConfiguration() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, 'lazy' => true, 'receive_method' => 'basic_get', ], $config); @@ -78,7 +78,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertTrue($container->hasDefinition($serviceId)); @@ -91,7 +91,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]], $factory->getArguments()); } @@ -108,7 +108,7 @@ public function testShouldCreateContext() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertEquals('enqueue.transport.rabbitmq_amqp.context', $serviceId); diff --git a/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php b/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php index 22b1b4354..58cc6b352 100644 --- a/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php +++ b/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php @@ -58,7 +58,7 @@ public function testShouldAllowAddConfiguration() 'user' => 'guest', 'pass' => 'guest', 'vhost' => '/', - 'delay_plugin_installed' => false, + 'delay_strategy' => null, 'lazy' => true, 'receive_method' => 'basic_get', 'connection_timeout' => 3.0, @@ -85,7 +85,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertTrue($container->hasDefinition($serviceId)); @@ -98,7 +98,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]], $factory->getArguments()); } @@ -115,7 +115,7 @@ public function testShouldCreateContext() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertEquals('enqueue.transport.rabbitmq_amqp_lib.context', $serviceId); From 6d70adff61a6117fdf3ab0468ecb22f7005c6070 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 13:16:49 +0300 Subject: [PATCH 04/33] delay strategy --- pkg/enqueue/Client/Amqp/RabbitMqDriver.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/enqueue/Client/Amqp/RabbitMqDriver.php b/pkg/enqueue/Client/Amqp/RabbitMqDriver.php index 4b1a26995..eecdb67af 100644 --- a/pkg/enqueue/Client/Amqp/RabbitMqDriver.php +++ b/pkg/enqueue/Client/Amqp/RabbitMqDriver.php @@ -122,7 +122,7 @@ public function createTransportMessage(Message $message) throw new LogicException('The message delaying is not supported. In order to use delay feature install RabbitMQ delay strategy.'); } - $transportMessage->setProperty('enqueue.delay', $message->getDelay() * 1000); + $transportMessage->setProperty('enqueue-delay', $message->getDelay() * 1000); } return $transportMessage; @@ -145,9 +145,9 @@ public function createClientMessage(PsrMessage $message) $clientMessage->setPriority($clientPriority); } - if ($delay = $message->getProperty('enqueue.delay')) { + if ($delay = $message->getProperty('enqueue-delay')) { if (false == is_numeric($delay)) { - throw new \LogicException(sprintf('"enqueue.delay" header is not numeric. "%s"', $delay)); + throw new \LogicException(sprintf('"enqueue-delay" header is not numeric. "%s"', $delay)); } $clientMessage->setDelay((int) ((int) $delay) / 1000); From be3b5ddfc33bb9c4d39e12509c4c5c2a129951f4 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 13:50:09 +0300 Subject: [PATCH 05/33] delay strategy --- .../RabbitMqAmqpBunnyTransportFactory.php | 2 +- .../RabbitMqAmqpBunnyTransportFactoryTest.php | 2 +- .../Symfony/RabbitMqAmqpTransportFactory.php | 2 +- .../RabbitMqAmqpTransportFactoryTest.php | 2 +- .../RabbitMqAmqpLibTransportFactory.php | 2 +- pkg/amqp-lib/Tests/AmqpConsumerTest.php | 4 ++ pkg/amqp-lib/Tests/AmqpContextTest.php | 5 +- pkg/amqp-lib/Tests/AmqpProducerTest.php | 23 +++++--- .../RabbitMqAmqpLibTransportFactoryTest.php | 2 +- .../Tests/Client/Amqp/RabbitMqDriverTest.php | 57 ++++++------------- 10 files changed, 46 insertions(+), 55 deletions(-) diff --git a/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php b/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php index c7ccb4f65..95341671f 100644 --- a/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php +++ b/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php @@ -31,7 +31,7 @@ public function addConfiguration(ArrayNodeDefinition $builder) $builder ->children() ->scalarNode('delay_strategy') - ->defaultNull() + ->defaultValue('dlx') ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') ->end() ; diff --git a/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php b/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php index 24230569e..555ad777e 100644 --- a/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php +++ b/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php @@ -58,7 +58,7 @@ public function testShouldAllowAddConfiguration() 'user' => 'guest', 'pass' => 'guest', 'vhost' => '/', - 'delay_strategy' => null, + 'delay_strategy' => 'dlx', 'lazy' => true, 'receive_method' => 'basic_get', 'heartbeat' => 0, diff --git a/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php b/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php index 6663ac9ec..8ab200e14 100644 --- a/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php +++ b/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php @@ -31,7 +31,7 @@ public function addConfiguration(ArrayNodeDefinition $builder) $builder ->children() ->scalarNode('delay_strategy') - ->defaultNull() + ->defaultValue('dlx') ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') ->end() ; diff --git a/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php b/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php index cee668049..31853492a 100644 --- a/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php +++ b/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php @@ -59,7 +59,7 @@ public function testShouldAllowAddConfiguration() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_strategy' => null, + 'delay_strategy' => 'dlx', 'lazy' => true, 'receive_method' => 'basic_get', ], $config); diff --git a/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php b/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php index a41eb01ef..43a88a2da 100644 --- a/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php +++ b/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php @@ -31,7 +31,7 @@ public function addConfiguration(ArrayNodeDefinition $builder) $builder ->children() ->scalarNode('delay_strategy') - ->defaultNull() + ->defaultValue('dlx') ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') ->end() ; diff --git a/pkg/amqp-lib/Tests/AmqpConsumerTest.php b/pkg/amqp-lib/Tests/AmqpConsumerTest.php index f4462e2ad..21585734c 100644 --- a/pkg/amqp-lib/Tests/AmqpConsumerTest.php +++ b/pkg/amqp-lib/Tests/AmqpConsumerTest.php @@ -102,6 +102,7 @@ public function testShouldReturnMessageOnReceiveNoWait() $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); $amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag'; $amqpMessage->delivery_info['redelivered'] = true; + $amqpMessage->delivery_info['routing_key'] = 'routing-key'; $channel = $this->createChannelMock(); $channel @@ -120,6 +121,7 @@ public function testShouldReturnMessageOnReceiveNoWait() $this->assertInstanceOf(AmqpMessage::class, $message); $this->assertSame('body', $message->getBody()); $this->assertSame('delivery-tag', $message->getDeliveryTag()); + $this->assertSame('routing-key', $message->getRoutingKey()); $this->assertTrue($message->isRedelivered()); } @@ -127,6 +129,7 @@ public function testShouldReturnMessageOnReceiveWithReceiveMethodBasicGet() { $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); $amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag'; + $amqpMessage->delivery_info['routing_key'] = 'routing-key'; $amqpMessage->delivery_info['redelivered'] = true; $channel = $this->createChannelMock(); @@ -146,6 +149,7 @@ public function testShouldReturnMessageOnReceiveWithReceiveMethodBasicGet() $this->assertInstanceOf(AmqpMessage::class, $message); $this->assertSame('body', $message->getBody()); $this->assertSame('delivery-tag', $message->getDeliveryTag()); + $this->assertSame('routing-key', $message->getRoutingKey()); $this->assertTrue($message->isRedelivered()); } diff --git a/pkg/amqp-lib/Tests/AmqpContextTest.php b/pkg/amqp-lib/Tests/AmqpContextTest.php index 80dee492a..be52235e7 100644 --- a/pkg/amqp-lib/Tests/AmqpContextTest.php +++ b/pkg/amqp-lib/Tests/AmqpContextTest.php @@ -8,6 +8,7 @@ use Interop\Amqp\Impl\AmqpTopic; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; +use PhpAmqpLib\Wire\AMQPTable; use PHPUnit\Framework\TestCase; class AmqpContextTest extends TestCase @@ -26,7 +27,7 @@ public function testShouldDeclareTopic() $this->isTrue(), $this->isTrue(), $this->isTrue(), - $this->identicalTo(['key' => 'value']), + $this->isInstanceOf(AMQPTable::class), $this->isNull() ) ; @@ -94,7 +95,7 @@ public function testShouldDeclareQueue() $this->isTrue(), $this->isTrue(), $this->isTrue(), - $this->identicalTo(['key' => 'value']), + $this->isInstanceOf(AMQPTable::class), $this->isNull() ) ; diff --git a/pkg/amqp-lib/Tests/AmqpProducerTest.php b/pkg/amqp-lib/Tests/AmqpProducerTest.php index 1d389bf3c..8ccf419ac 100644 --- a/pkg/amqp-lib/Tests/AmqpProducerTest.php +++ b/pkg/amqp-lib/Tests/AmqpProducerTest.php @@ -2,6 +2,7 @@ namespace Enqueue\AmqpLib\Tests; +use Enqueue\AmqpLib\AmqpContext; use Enqueue\AmqpLib\AmqpProducer; use Enqueue\Test\ClassExtensionTrait; use Interop\Amqp\Impl\AmqpMessage; @@ -23,7 +24,7 @@ class AmqpProducerTest extends TestCase public function testCouldBeConstructedWithRequiredArguments() { - new AmqpProducer($this->createAmqpChannelMock()); + new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock()); } public function testShouldImplementPsrProducerInterface() @@ -33,7 +34,7 @@ public function testShouldImplementPsrProducerInterface() public function testShouldThrowExceptionWhenDestinationTypeIsInvalid() { - $producer = new AmqpProducer($this->createAmqpChannelMock()); + $producer = new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock()); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Interop\Amqp\AmqpQueue but got'); @@ -43,7 +44,7 @@ public function testShouldThrowExceptionWhenDestinationTypeIsInvalid() public function testShouldThrowExceptionWhenMessageTypeIsInvalid() { - $producer = new AmqpProducer($this->createAmqpChannelMock()); + $producer = new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock()); $this->expectException(InvalidMessageException::class); $this->expectExceptionMessage('The message must be an instance of Interop\Amqp\AmqpMessage but it is'); @@ -70,7 +71,7 @@ public function testShouldPublishMessageToTopic() $message = new AmqpMessage('body'); $message->setRoutingKey('routing-key'); - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send($topic, $message); $this->assertEquals('body', $amqpMessage->getBody()); @@ -92,7 +93,7 @@ public function testShouldPublishMessageToQueue() $queue = new AmqpQueue('queue'); - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send($queue, new AmqpMessage('body')); $this->assertEquals('body', $amqpMessage->getBody()); @@ -111,7 +112,7 @@ public function testShouldSetMessageHeaders() })) ; - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send(new AmqpTopic('name'), new AmqpMessage('body', [], ['content_type' => 'text/plain'])); $this->assertEquals(['content_type' => 'text/plain'], $amqpMessage->get_properties()); @@ -130,7 +131,7 @@ public function testShouldSetMessageProperties() })) ; - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send(new AmqpTopic('name'), new AmqpMessage('body', ['key' => 'value'])); $properties = $amqpMessage->get_properties(); @@ -163,4 +164,12 @@ private function createAmqpChannelMock() { return $this->createMock(AMQPChannel::class); } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext + */ + private function createContextMock() + { + return $this->createMock(AmqpContext::class); + } } diff --git a/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php b/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php index 58cc6b352..b86a57bdf 100644 --- a/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php +++ b/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php @@ -58,7 +58,7 @@ public function testShouldAllowAddConfiguration() 'user' => 'guest', 'pass' => 'guest', 'vhost' => '/', - 'delay_strategy' => null, + 'delay_strategy' => 'dlx', 'lazy' => true, 'receive_method' => 'basic_get', 'connection_timeout' => 3.0, diff --git a/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php b/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php index 698fc5c3e..d63596c75 100644 --- a/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php +++ b/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php @@ -97,7 +97,7 @@ public function testShouldConvertTransportMessageToClientMessage() $transportMessage->setBody('body'); $transportMessage->setHeaders(['hkey' => 'hval']); $transportMessage->setProperties(['key' => 'val']); - $transportMessage->setProperty('x-delay', '5678000'); + $transportMessage->setProperty('enqueue-delay', '5678000'); $transportMessage->setHeader('content_type', 'ContentType'); $transportMessage->setHeader('expiration', '12345000'); $transportMessage->setHeader('priority', 3); @@ -108,7 +108,7 @@ public function testShouldConvertTransportMessageToClientMessage() $driver = new RabbitMqDriver( $this->createAmqpContextMock(), - new Config('', '', '', '', '', '', ['delay_plugin_installed' => true]), + new Config('', '', '', '', '', '', ['delay_strategy' => 'dlx']), $this->createDummyQueueMetaRegistry() ); @@ -128,7 +128,7 @@ public function testShouldConvertTransportMessageToClientMessage() ], $clientMessage->getHeaders()); $this->assertSame([ 'key' => 'val', - 'x-delay' => '5678000', + 'enqueue-delay' => '5678000', ], $clientMessage->getProperties()); $this->assertSame('MessageId', $clientMessage->getMessageId()); $this->assertSame(12345, $clientMessage->getExpire()); @@ -143,7 +143,7 @@ public function testShouldConvertTransportMessageToClientMessage() public function testShouldThrowExceptionIfXDelayIsNotNumeric() { $transportMessage = new AmqpMessage(); - $transportMessage->setProperty('x-delay', 'is-not-numeric'); + $transportMessage->setProperty('enqueue-delay', 'is-not-numeric'); $driver = new RabbitMqDriver( $this->createAmqpContextMock(), @@ -152,7 +152,7 @@ public function testShouldThrowExceptionIfXDelayIsNotNumeric() ); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('x-delay header is not numeric. "is-not-numeric"'); + $this->expectExceptionMessage('"enqueue-delay" header is not numeric. "is-not-numeric"'); $driver->createClientMessage($transportMessage); } @@ -239,7 +239,7 @@ public function testShouldConvertClientMessageToTransportMessage() $driver = new RabbitMqDriver( $context, - new Config('', '', '', '', '', '', ['delay_plugin_installed' => true]), + new Config('', '', '', '', '', '', ['delay_strategy' => 'dlx']), $this->createDummyQueueMetaRegistry() ); @@ -260,7 +260,7 @@ public function testShouldConvertClientMessageToTransportMessage() ], $transportMessage->getHeaders()); $this->assertSame([ 'key' => 'val', - 'x-delay' => '432000', + 'enqueue-delay' => 432000, ], $transportMessage->getProperties()); $this->assertSame('MessageId', $transportMessage->getMessageId()); $this->assertSame(1000, $transportMessage->getTimestamp()); @@ -282,12 +282,12 @@ public function testThrowIfDelayNotSupportedOnConvertClientMessageToTransportMes $driver = new RabbitMqDriver( $context, - new Config('', '', '', '', '', '', ['delay_plugin_installed' => false]), + new Config('', '', '', '', '', '', ['delay_strategy' => null]), $this->createDummyQueueMetaRegistry() ); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The message delaying is not supported. In order to use delay feature install RabbitMQ delay plugin.'); + $this->expectExceptionMessage('The message delaying is not supported. In order to use delay feature install RabbitMQ delay strategy.'); $driver->createTransportMessage($clientMessage); } @@ -386,17 +386,21 @@ public function testShouldSendMessageToProcessor() $driver->sendToProcessor($message); } - public function testShouldSendMessageToDelayExchangeIfDelaySet() + public function testShouldSendMessageToProcessorWithDeliveryDelay() { $queue = new AmqpQueue(''); - $delayTopic = new AmqpTopic(''); $transportMessage = new AmqpMessage(); $producer = $this->createAmqpProducerMock(); $producer ->expects($this->once()) ->method('send') - ->with($this->identicalTo($delayTopic), $this->identicalTo($transportMessage)) + ->with($this->identicalTo($queue), $this->identicalTo($transportMessage)) + ; + $producer + ->expects($this->once()) + ->method('setDeliveryDelay') + ->with($this->identicalTo(10000)) ; $context = $this->createAmqpContextMock(); $context @@ -404,11 +408,6 @@ public function testShouldSendMessageToDelayExchangeIfDelaySet() ->method('createQueue') ->willReturn($queue) ; - $context - ->expects($this->once()) - ->method('createTopic') - ->willReturn($delayTopic) - ; $context ->expects($this->once()) ->method('createProducer') @@ -422,7 +421,7 @@ public function testShouldSendMessageToDelayExchangeIfDelaySet() $driver = new RabbitMqDriver( $context, - new Config('', '', '', '', '', '', ['delay_plugin_installed' => true]), + new Config('', '', '', '', '', '', ['delay_strategy' => 'dlx']), $this->createDummyQueueMetaRegistry() ); @@ -521,7 +520,6 @@ public function testShouldSetupBroker() $routerQueue = new AmqpQueue(''); $processorQueue = new AmqpQueue(''); - $delayTopic = new AmqpTopic(''); $context = $this->createAmqpContextMock(); // setup router @@ -561,27 +559,6 @@ public function testShouldSetupBroker() ->method('declareQueue') ->with($this->identicalTo($processorQueue)) ; - $context - ->expects($this->at(7)) - ->method('createQueue') - ->willReturn($processorQueue) - ; - $context - ->expects($this->at(8)) - ->method('createTopic') - ->willReturn($delayTopic) - ; - $context - ->expects($this->at(9)) - ->method('declareTopic') - ->with($this->identicalTo($delayTopic)) - ; - - $context - ->expects($this->at(10)) - ->method('bind') - ->with($this->isInstanceOf(AmqpBind::class)) - ; $config = Config::create('', '', '', '', '', '', ['delay_plugin_installed' => true]); From 2ac9251544d721943e99e1dc0be1dff89e24db6a Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 14:04:00 +0300 Subject: [PATCH 06/33] delay strategy --- pkg/enqueue/Client/Amqp/RabbitMqDriver.php | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/enqueue/Client/Amqp/RabbitMqDriver.php b/pkg/enqueue/Client/Amqp/RabbitMqDriver.php index eecdb67af..518b90056 100644 --- a/pkg/enqueue/Client/Amqp/RabbitMqDriver.php +++ b/pkg/enqueue/Client/Amqp/RabbitMqDriver.php @@ -10,11 +10,7 @@ use Interop\Amqp\AmqpContext; use Interop\Amqp\AmqpMessage; use Interop\Amqp\AmqpQueue; -use Interop\Amqp\AmqpTopic; -use Interop\Amqp\Impl\AmqpBind; use Interop\Queue\PsrMessage; -use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; class RabbitMqDriver extends AmqpDriver { From c4153cb92d44f38d6896288cad8bb46a5e9f9cf3 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 14:18:09 +0300 Subject: [PATCH 07/33] delay strategy --- ...pSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 5947a7cd1..52d5b6e01 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -35,7 +35,6 @@ protected function createQueue(PsrContext $context, $queueName) $queue = $context->createQueue($queueName); $context->declareQueue($queue); - $context->purgeQueue($queue); $context->bind(new AmqpBind($context->createTopic($queueName), $queue)); From b38f0bc4b0b1e8b4e586357f3ec45e1058ab7543 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 14:51:08 +0300 Subject: [PATCH 08/33] delay strategy --- phpunit.xml.dist | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 0fab760d0..2a8cd342a 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -30,7 +30,7 @@ - pkg/amqp-lib/Tests + pkg/amqp-lib/Tests From dbcfce6a04bf6425389924f05c8b1e9debb2c698 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 14:51:25 +0300 Subject: [PATCH 09/33] delay strategy --- phpunit.xml.dist | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 2a8cd342a..0fab760d0 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -30,7 +30,7 @@ - pkg/amqp-lib/Tests + pkg/amqp-lib/Tests From 7ddc44e48d7122cff53383df74c834c7e52b2259 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 15:02:39 +0300 Subject: [PATCH 10/33] delay strategy --- bin/test | 2 +- pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/test b/bin/test index 6641bc743..2d149946b 100755 --- a/bin/test +++ b/bin/test @@ -31,4 +31,4 @@ php pkg/job-queue/Tests/Functional/app/console doctrine:database:create php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force #php pkg/enqueue-bundle/Tests/Functional/app/console.php config:dump-reference enqueue -bin/phpunit "$@" +bin/phpunit --debug "$@" diff --git a/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php b/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php index d63596c75..3b67ef52e 100644 --- a/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php +++ b/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php @@ -505,7 +505,7 @@ public function testShouldSetupBrokerWhenDelayPluginNotInstalled() ->willReturn($processorQueue) ; - $config = Config::create('', '', '', '', '', '', ['delay_plugin_installed' => false]); + $config = Config::create('', '', '', '', '', '', ['delay_strategy' => null]); $meta = new QueueMetaRegistry($config, ['default' => []]); @@ -560,7 +560,7 @@ public function testShouldSetupBroker() ->with($this->identicalTo($processorQueue)) ; - $config = Config::create('', '', '', '', '', '', ['delay_plugin_installed' => true]); + $config = Config::create('', '', '', '', '', '', ['delay_strategy' => 'dlx']); $meta = new QueueMetaRegistry($config, ['default' => []]); From 3a03b738b8dd4738e14b76f87fa2da6fc067554c Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 16:06:43 +0300 Subject: [PATCH 11/33] delay strategy --- ...dAndReceiveDelayedMessageFromQueueTest.php | 28 ++++++++++++++++--- .../Spec/SqsSendToAndReceiveFromQueueTest.php | 28 ++++++++++++++++--- .../Spec/SqsSendToAndReceiveFromTopicTest.php | 28 ++++++++++++++++--- 3 files changed, 72 insertions(+), 12 deletions(-) diff --git a/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php index 9fd3e96b8..7472958fc 100644 --- a/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php @@ -4,6 +4,7 @@ use Enqueue\Sqs\SqsConnectionFactory; use Enqueue\Sqs\SqsContext; +use Enqueue\Sqs\SqsDestination; use Enqueue\Test\RetryTrait; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -16,6 +17,16 @@ class SqsSendAndReceiveDelayedMessageFromQueueTest extends SendAndReceiveDelayed { use RetryTrait; + /** + * @var SqsContext + */ + private $context; + + /** + * @var SqsDestination + */ + private $queue; + /** * {@inheritdoc} */ @@ -27,7 +38,7 @@ protected function createContext() 'region' => getenv('AWS__SQS__REGION'), ]); - return $factory->createContext(); + return $this->context = $factory->createContext(); } /** @@ -39,9 +50,18 @@ protected function createQueue(PsrContext $context, $queueName) { $queueName = $queueName.time(); - $queue = $context->createQueue($queueName); - $context->declareQueue($queue); + $this->queue = $context->createQueue($queueName); + $context->declareQueue($this->queue); + + return $this->queue; + } + + protected function tearDown() + { + parent::tearDown(); - return $queue; + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php index 3e73cd489..3147e7445 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php @@ -4,6 +4,7 @@ use Enqueue\Sqs\SqsConnectionFactory; use Enqueue\Sqs\SqsContext; +use Enqueue\Sqs\SqsDestination; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec; @@ -12,6 +13,16 @@ */ class SqsSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec { + /** + * @var SqsContext + */ + private $context; + + /** + * @var SqsDestination + */ + private $queue; + /** * {@inheritdoc} */ @@ -23,7 +34,7 @@ protected function createContext() 'region' => getenv('AWS__SQS__REGION'), ]); - return $factory->createContext(); + return $this->context = $factory->createContext(); } /** @@ -35,9 +46,18 @@ protected function createQueue(PsrContext $context, $queueName) { $queueName = $queueName.time(); - $queue = $context->createQueue($queueName); - $context->declareQueue($queue); + $this->queue = $context->createQueue($queueName); + $context->declareQueue($this->queue); + + return $this->queue; + } + + protected function tearDown() + { + parent::tearDown(); - return $queue; + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php index 5c4595e88..6d49608f8 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php @@ -4,6 +4,7 @@ use Enqueue\Sqs\SqsConnectionFactory; use Enqueue\Sqs\SqsContext; +use Enqueue\Sqs\SqsDestination; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendToAndReceiveFromTopicSpec; @@ -12,6 +13,16 @@ */ class SqsSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec { + /** + * @var SqsContext + */ + private $context; + + /** + * @var SqsDestination + */ + private $queue; + /** * {@inheritdoc} */ @@ -23,7 +34,7 @@ protected function createContext() 'region' => getenv('AWS__SQS__REGION'), ]); - return $factory->createContext(); + return $this->context = $factory->createContext(); } /** @@ -35,9 +46,18 @@ protected function createTopic(PsrContext $context, $topicName) { $topicName = $topicName.time(); - $topic = $context->createTopic($topicName); - $context->declareQueue($topic); + $this->queue = $context->createTopic($topicName); + $context->declareQueue($this->queue); + + return $this->queue; + } + + protected function tearDown() + { + parent::tearDown(); - return $topic; + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } } } From a05806f5b0e80b7717eabfc04d4de8ca3e4bfedc Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 9 Aug 2017 16:19:14 +0300 Subject: [PATCH 12/33] delay strategy --- ...ndAndReceiveDelayedMessageFromQueueTest.php | 18 +++++++++--------- .../Spec/SqsSendToAndReceiveFromQueueTest.php | 18 +++++++++--------- .../Spec/SqsSendToAndReceiveFromTopicTest.php | 18 +++++++++--------- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php index 7472958fc..8c1391915 100644 --- a/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php @@ -27,6 +27,15 @@ class SqsSendAndReceiveDelayedMessageFromQueueTest extends SendAndReceiveDelayed */ private $queue; + protected function tearDown() + { + parent::tearDown(); + + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } + } + /** * {@inheritdoc} */ @@ -55,13 +64,4 @@ protected function createQueue(PsrContext $context, $queueName) return $this->queue; } - - protected function tearDown() - { - parent::tearDown(); - - if ($this->context && $this->queue) { - $this->context->deleteQueue($this->queue); - } - } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php index 3147e7445..9bfb753f4 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php @@ -23,6 +23,15 @@ class SqsSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec */ private $queue; + protected function tearDown() + { + parent::tearDown(); + + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } + } + /** * {@inheritdoc} */ @@ -51,13 +60,4 @@ protected function createQueue(PsrContext $context, $queueName) return $this->queue; } - - protected function tearDown() - { - parent::tearDown(); - - if ($this->context && $this->queue) { - $this->context->deleteQueue($this->queue); - } - } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php index 6d49608f8..cb611d6df 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php @@ -23,6 +23,15 @@ class SqsSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec */ private $queue; + protected function tearDown() + { + parent::tearDown(); + + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } + } + /** * {@inheritdoc} */ @@ -51,13 +60,4 @@ protected function createTopic(PsrContext $context, $topicName) return $this->queue; } - - protected function tearDown() - { - parent::tearDown(); - - if ($this->context && $this->queue) { - $this->context->deleteQueue($this->queue); - } - } } From 4853455e338c90f8052b2e1f4c5a1874901478e6 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 9 Aug 2017 17:12:39 +0300 Subject: [PATCH 13/33] [sqs] fix hanged tests. --- pkg/test/RetryTrait.php | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/test/RetryTrait.php b/pkg/test/RetryTrait.php index cc8c377e4..de17565bd 100644 --- a/pkg/test/RetryTrait.php +++ b/pkg/test/RetryTrait.php @@ -9,6 +9,14 @@ public function runBare() $e = null; $numberOfRetires = $this->getNumberOfRetries(); + if (false == is_numeric($numberOfRetires)) { + throw new \LogicException(sprintf('The $numberOfRetires must be a number but got "%s"', var_export($numberOfRetires, true))); + } + $numberOfRetires = (int) $numberOfRetires; + if ($numberOfRetires <= 0) { + throw new \LogicException(sprintf('The $numberOfRetires must be a positive number greater than 0 but got "%s".', $numberOfRetires)); + } + for ($i = 0; $i < $numberOfRetires; ++$i) { try { parent::runBare(); @@ -37,8 +45,8 @@ private function getNumberOfRetries() { $annotations = $this->getAnnotations(); - if (isset($annotations['method']['retry'])) { - return $annotations['method']['retry']; + if (isset($annotations['method']['retry'][0])) { + return $annotations['method']['retry'][0]; } if (isset($annotations['class']['retry'][0])) { From 4e3de1447e5fa631f0777833e3904c69dd8841ee Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 09:45:55 +0300 Subject: [PATCH 14/33] [sqs] fix hanged tests. --- bin/test | 2 +- pkg/sqs/SqsConsumer.php | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/bin/test b/bin/test index 2d149946b..6641bc743 100755 --- a/bin/test +++ b/bin/test @@ -31,4 +31,4 @@ php pkg/job-queue/Tests/Functional/app/console doctrine:database:create php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force #php pkg/enqueue-bundle/Tests/Functional/app/console.php config:dump-reference enqueue -bin/phpunit --debug "$@" +bin/phpunit "$@" diff --git a/pkg/sqs/SqsConsumer.php b/pkg/sqs/SqsConsumer.php index 700b5c330..4ec4abdf5 100644 --- a/pkg/sqs/SqsConsumer.php +++ b/pkg/sqs/SqsConsumer.php @@ -98,7 +98,21 @@ public function getQueue() */ public function receive($timeout = 0) { - $timeout /= 1000; + $maxLongPollingTime = 20; // 20 is max allowed long polling value + + if ($timeout === 0) { + while (true) { + if ($message = $this->receiveMessage($maxLongPollingTime)) { + return $message; + } + } + } + + $timeout = (int) ceil($timeout / 1000); + + if ($timeout > $maxLongPollingTime) { + throw new \LogicException(sprintf('Max allowed SQS receive message timeout is: "%s"', $maxLongPollingTime)); + } return $this->receiveMessage($timeout); } From e178d9e238712f5c57cbfcdc25a6a39ab85b6fe7 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 10:08:36 +0300 Subject: [PATCH 15/33] fix tests --- ...ToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 52d5b6e01..11158a0b1 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -14,6 +14,8 @@ */ class AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest extends SendToTopicAndReceiveFromQueueSpec { + private $topic; + /** * {@inheritdoc} */ @@ -36,7 +38,7 @@ protected function createQueue(PsrContext $context, $queueName) $queue = $context->createQueue($queueName); $context->declareQueue($queue); - $context->bind(new AmqpBind($context->createTopic($queueName), $queue)); + $context->bind(new AmqpBind($this->topic, $queue)); return $queue; } @@ -55,6 +57,6 @@ protected function createTopic(PsrContext $context, $topicName) $topic->addFlag(AmqpTopic::FLAG_DURABLE); $context->declareTopic($topic); - return $topic; + return $this->topic = $topic; } } From ec620dd10d4662d74c7b645aaece42bf94820fde Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 10:26:25 +0300 Subject: [PATCH 16/33] fix tests --- ...iveFromQueueWithBasicConsumeMethodTest.php | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 11158a0b1..6fb06f02e 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -7,6 +7,7 @@ use Interop\Amqp\AmqpTopic; use Interop\Amqp\Impl\AmqpBind; use Interop\Queue\PsrContext; +use Interop\Queue\PsrMessage; use Interop\Queue\Spec\SendToTopicAndReceiveFromQueueSpec; /** @@ -26,6 +27,31 @@ protected function createContext() return $factory->createContext(); } + public function test() + { + $context = $this->createContext(); + $topic = $this->createTopic($context, 'send_to_topic_and_receive_from_queue_spec'); + $queue = $this->createQueue($context, 'send_to_topic_and_receive_from_queue_spec'); + + $consumer = $context->createConsumer($queue); + + // guard + $this->assertNull($consumer->receiveNoWait()); + + $expectedBody = __CLASS__.time(); + + $context->createProducer()->send($topic, $context->createMessage($expectedBody)); + + $message = $consumer->receive(10000); // 2 sec + + var_dump($message); + + $this->assertInstanceOf(PsrMessage::class, $message); + $consumer->acknowledge($message); + + $this->assertSame($expectedBody, $message->getBody()); + } + /** * {@inheritdoc} * From 142a69ca0eb33f8a015b0f32fd94d71f829b521c Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 10:44:11 +0300 Subject: [PATCH 17/33] fix tests --- ...eceiveFromQueueWithBasicConsumeMethodTest.php | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 6fb06f02e..ff9ad460b 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -30,13 +30,19 @@ protected function createContext() public function test() { $context = $this->createContext(); - $topic = $this->createTopic($context, 'send_to_topic_and_receive_from_queue_spec'); - $queue = $this->createQueue($context, 'send_to_topic_and_receive_from_queue_spec'); - $consumer = $context->createConsumer($queue); + $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume2'); + $topic->setType(AmqpTopic::TYPE_DIRECT); + $topic->addFlag(AmqpTopic::FLAG_DURABLE); + $context->declareTopic($topic); - // guard - $this->assertNull($consumer->receiveNoWait()); + $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume2'); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + $context->bind(new AmqpBind($topic, $queue)); + + $consumer = $context->createConsumer($queue); $expectedBody = __CLASS__.time(); From fa8da150ded1f0de8ecf70114594e8835aecbcb2 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 10:54:26 +0300 Subject: [PATCH 18/33] fix tests --- ...ToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index ff9ad460b..65a24c88d 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -31,12 +31,12 @@ public function test() { $context = $this->createContext(); - $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume2'); - $topic->setType(AmqpTopic::TYPE_DIRECT); + $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume3'); + $topic->setType(AmqpTopic::TYPE_FANOUT); $topic->addFlag(AmqpTopic::FLAG_DURABLE); $context->declareTopic($topic); - $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume2'); + $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume3'); $context->declareQueue($queue); $context->purgeQueue($queue); From 0034d7902f355ebc28d0684ea5215fc5a5c989cf Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 11:02:44 +0300 Subject: [PATCH 19/33] fix tests --- ...SendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 65a24c88d..e9057b590 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -48,7 +48,7 @@ public function test() $context->createProducer()->send($topic, $context->createMessage($expectedBody)); - $message = $consumer->receive(10000); // 2 sec + $message = $consumer->receive(2000); // 2 sec var_dump($message); From 759a908503c4a86e154a018c5d4793f6c5954254 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 11:14:38 +0300 Subject: [PATCH 20/33] fix tests --- ...iveFromQueueWithBasicConsumeMethodTest.php | 43 +++++-------------- 1 file changed, 11 insertions(+), 32 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index e9057b590..5fa078a18 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -7,7 +7,6 @@ use Interop\Amqp\AmqpTopic; use Interop\Amqp\Impl\AmqpBind; use Interop\Queue\PsrContext; -use Interop\Queue\PsrMessage; use Interop\Queue\Spec\SendToTopicAndReceiveFromQueueSpec; /** @@ -27,37 +26,6 @@ protected function createContext() return $factory->createContext(); } - public function test() - { - $context = $this->createContext(); - - $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume3'); - $topic->setType(AmqpTopic::TYPE_FANOUT); - $topic->addFlag(AmqpTopic::FLAG_DURABLE); - $context->declareTopic($topic); - - $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume3'); - $context->declareQueue($queue); - $context->purgeQueue($queue); - - $context->bind(new AmqpBind($topic, $queue)); - - $consumer = $context->createConsumer($queue); - - $expectedBody = __CLASS__.time(); - - $context->createProducer()->send($topic, $context->createMessage($expectedBody)); - - $message = $consumer->receive(2000); // 2 sec - - var_dump($message); - - $this->assertInstanceOf(PsrMessage::class, $message); - $consumer->acknowledge($message); - - $this->assertSame($expectedBody, $message->getBody()); - } - /** * {@inheritdoc} * @@ -68,7 +36,13 @@ protected function createQueue(PsrContext $context, $queueName) $queueName .= '_basic_consume'; $queue = $context->createQueue($queueName); + + try { + $context->deleteQueue($queue); + } catch (\Exception $e) {} + $context->declareQueue($queue); + $context->purgeQueue($queue); $context->bind(new AmqpBind($this->topic, $queue)); @@ -87,6 +61,11 @@ protected function createTopic(PsrContext $context, $topicName) $topic = $context->createTopic($topicName); $topic->setType(AmqpTopic::TYPE_FANOUT); $topic->addFlag(AmqpTopic::FLAG_DURABLE); + + try { + $context->deleteQueue($topic); + } catch (\Exception $e) {} + $context->declareTopic($topic); return $this->topic = $topic; From 405c12d3310b338a2f079752f57b3d89a97a3196 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 11:24:55 +0300 Subject: [PATCH 21/33] fix tests --- ...ToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 5fa078a18..0920a032e 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -35,7 +35,8 @@ protected function createQueue(PsrContext $context, $queueName) { $queueName .= '_basic_consume'; - $queue = $context->createQueue($queueName); +// $queue = $context->createQueue($queueName); + $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume4'); try { $context->deleteQueue($queue); @@ -58,7 +59,8 @@ protected function createTopic(PsrContext $context, $topicName) { $topicName .= '_basic_consume'; - $topic = $context->createTopic($topicName); +// $topic = $context->createTopic($topicName); + $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume4'); $topic->setType(AmqpTopic::TYPE_FANOUT); $topic->addFlag(AmqpTopic::FLAG_DURABLE); From a5f3844c3004e9fd734efaf6e9265352db884180 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 11:37:11 +0300 Subject: [PATCH 22/33] fix tests --- ...iveFromQueueWithBasicConsumeMethodTest.php | 54 +++++++------------ 1 file changed, 19 insertions(+), 35 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 0920a032e..162b296bc 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -3,10 +3,9 @@ namespace Enqueue\AmqpLib\Tests\Spec; use Enqueue\AmqpLib\AmqpConnectionFactory; -use Enqueue\AmqpLib\AmqpContext; use Interop\Amqp\AmqpTopic; use Interop\Amqp\Impl\AmqpBind; -use Interop\Queue\PsrContext; +use Interop\Queue\PsrMessage; use Interop\Queue\Spec\SendToTopicAndReceiveFromQueueSpec; /** @@ -26,50 +25,35 @@ protected function createContext() return $factory->createContext(); } - /** - * {@inheritdoc} - * - * @param AmqpContext $context - */ - protected function createQueue(PsrContext $context, $queueName) + public function test() { - $queueName .= '_basic_consume'; - -// $queue = $context->createQueue($queueName); - $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume4'); + $context = $this->createContext(); - try { - $context->deleteQueue($queue); - } catch (\Exception $e) {} + $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume'); + $topic->setType(AmqpTopic::TYPE_FANOUT); + $topic->addFlag(AmqpTopic::FLAG_DURABLE); + $context->declareTopic($topic); + $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume'); $context->declareQueue($queue); $context->purgeQueue($queue); - $context->bind(new AmqpBind($this->topic, $queue)); + $context->bind(new AmqpBind($topic, $queue)); - return $queue; - } + $consumer = $context->createConsumer($queue); - /** - * {@inheritdoc} - * - * @param AmqpContext $context - */ - protected function createTopic(PsrContext $context, $topicName) - { - $topicName .= '_basic_consume'; + // guard + $this->assertNull($consumer->receiveNoWait()); -// $topic = $context->createTopic($topicName); - $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume4'); - $topic->setType(AmqpTopic::TYPE_FANOUT); - $topic->addFlag(AmqpTopic::FLAG_DURABLE); + $expectedBody = __CLASS__.time(); - try { - $context->deleteQueue($topic); - } catch (\Exception $e) {} + $context->createProducer()->send($topic, $context->createMessage($expectedBody)); - $context->declareTopic($topic); + $message = $consumer->receive(2000); // 2 sec + + $this->assertInstanceOf(PsrMessage::class, $message); + $consumer->acknowledge($message); - return $this->topic = $topic; + $this->assertSame($expectedBody, $message->getBody()); } } From 1d34254b751f2932cb733da7268126957ac4524e Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 11:44:57 +0300 Subject: [PATCH 23/33] fix tests --- ...SendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 162b296bc..b9509e791 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -43,7 +43,7 @@ public function test() $consumer = $context->createConsumer($queue); // guard - $this->assertNull($consumer->receiveNoWait()); +// $this->assertNull($consumer->receiveNoWait()); $expectedBody = __CLASS__.time(); From fe6fc1cdddd2d998dc71f3e16912cb2f2465e22c Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 11:58:58 +0300 Subject: [PATCH 24/33] fix tests --- ...ndToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index b9509e791..0d57490da 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -29,12 +29,12 @@ public function test() { $context = $this->createContext(); - $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume'); + $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume5'); $topic->setType(AmqpTopic::TYPE_FANOUT); $topic->addFlag(AmqpTopic::FLAG_DURABLE); $context->declareTopic($topic); - $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume'); + $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume5'); $context->declareQueue($queue); $context->purgeQueue($queue); From a3ff6f855ee27db34c4c7fb5a3a6810fc3323291 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 12:11:06 +0300 Subject: [PATCH 25/33] fix tests --- ...SendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 0d57490da..fff750354 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -42,7 +42,7 @@ public function test() $consumer = $context->createConsumer($queue); - // guard + // guard // $this->assertNull($consumer->receiveNoWait()); $expectedBody = __CLASS__.time(); From 064399e3dc1b8ef7d5bb194514bb306b21812047 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 12:33:55 +0300 Subject: [PATCH 26/33] fix tests --- ...ndToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index fff750354..fa03d1720 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -42,8 +42,8 @@ public function test() $consumer = $context->createConsumer($queue); - // guard -// $this->assertNull($consumer->receiveNoWait()); + // guard + $this->assertNull($consumer->receiveNoWait()); $expectedBody = __CLASS__.time(); From 2e447cc2f52adea10b8aed12260ed0e5690a4554 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 12:41:48 +0300 Subject: [PATCH 27/33] fix tests --- ...SendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index fa03d1720..299517df5 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -42,7 +42,7 @@ public function test() $consumer = $context->createConsumer($queue); - // guard + // guard $this->assertNull($consumer->receiveNoWait()); $expectedBody = __CLASS__.time(); From 5b23f8641ac8ed6c95582b0298a892c1d190275b Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 12:50:24 +0300 Subject: [PATCH 28/33] fix tests --- ...ndToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 299517df5..57252043c 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -29,12 +29,12 @@ public function test() { $context = $this->createContext(); - $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume5'); + $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume'); $topic->setType(AmqpTopic::TYPE_FANOUT); $topic->addFlag(AmqpTopic::FLAG_DURABLE); $context->declareTopic($topic); - $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume5'); + $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume'); $context->declareQueue($queue); $context->purgeQueue($queue); From 91a68e7b894bd9371b46f254a596a7963a6a4f78 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 12:56:53 +0300 Subject: [PATCH 29/33] fix tests --- ...icAndReceiveFromQueueWithBasicConsumeMethodTest.php | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 57252043c..9a71a6028 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -32,9 +32,19 @@ public function test() $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume'); $topic->setType(AmqpTopic::TYPE_FANOUT); $topic->addFlag(AmqpTopic::FLAG_DURABLE); + + try { + $context->deleteTopic($topic); + } catch (\Exception $e) {} + $context->declareTopic($topic); $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume'); + + try { + $context->deleteQueue($queue); + } catch (\Exception $e) {} + $context->declareQueue($queue); $context->purgeQueue($queue); From 383a934689f6d59f130deab5c942565616620ec4 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 13:06:24 +0300 Subject: [PATCH 30/33] fix tests --- ...iveFromQueueWithBasicConsumeMethodTest.php | 94 ++++++++++++++----- 1 file changed, 68 insertions(+), 26 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 9a71a6028..a7c12ec4d 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -3,9 +3,10 @@ namespace Enqueue\AmqpLib\Tests\Spec; use Enqueue\AmqpLib\AmqpConnectionFactory; +use Interop\Amqp\AmqpContext; use Interop\Amqp\AmqpTopic; use Interop\Amqp\Impl\AmqpBind; -use Interop\Queue\PsrMessage; +use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendToTopicAndReceiveFromQueueSpec; /** @@ -25,20 +26,55 @@ protected function createContext() return $factory->createContext(); } - public function test() - { - $context = $this->createContext(); - - $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume'); - $topic->setType(AmqpTopic::TYPE_FANOUT); - $topic->addFlag(AmqpTopic::FLAG_DURABLE); - - try { - $context->deleteTopic($topic); - } catch (\Exception $e) {} - - $context->declareTopic($topic); +// public function test() +// { +// $context = $this->createContext(); +// +// $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume'); +// $topic->setType(AmqpTopic::TYPE_FANOUT); +// $topic->addFlag(AmqpTopic::FLAG_DURABLE); +// +// try { +// $context->deleteTopic($topic); +// } catch (\Exception $e) {} +// +// $context->declareTopic($topic); +// +// $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume'); +// +// try { +// $context->deleteQueue($queue); +// } catch (\Exception $e) {} +// +// $context->declareQueue($queue); +// $context->purgeQueue($queue); +// +// $context->bind(new AmqpBind($topic, $queue)); +// +// $consumer = $context->createConsumer($queue); +// +// // guard +// $this->assertNull($consumer->receiveNoWait()); +// +// $expectedBody = __CLASS__.time(); +// +// $context->createProducer()->send($topic, $context->createMessage($expectedBody)); +// +// $message = $consumer->receive(2000); // 2 sec +// +// $this->assertInstanceOf(PsrMessage::class, $message); +// $consumer->acknowledge($message); +// +// $this->assertSame($expectedBody, $message->getBody()); +// } + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume'); try { @@ -48,22 +84,28 @@ public function test() $context->declareQueue($queue); $context->purgeQueue($queue); - $context->bind(new AmqpBind($topic, $queue)); + $context->bind(new AmqpBind($this->topic, $queue)); - $consumer = $context->createConsumer($queue); - - // guard - $this->assertNull($consumer->receiveNoWait()); - - $expectedBody = __CLASS__.time(); + return $queue; + } - $context->createProducer()->send($topic, $context->createMessage($expectedBody)); + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume'); + $topic->setType(AmqpTopic::TYPE_FANOUT); + $topic->addFlag(AmqpTopic::FLAG_DURABLE); - $message = $consumer->receive(2000); // 2 sec + try { + $context->deleteTopic($topic); + } catch (\Exception $e) {} - $this->assertInstanceOf(PsrMessage::class, $message); - $consumer->acknowledge($message); + $context->declareTopic($topic); - $this->assertSame($expectedBody, $message->getBody()); + return $this->topic = $topic; } } From b3139cd11a7093538bb548a87cd9c88c0815707a Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 13:11:58 +0300 Subject: [PATCH 31/33] fix tests --- ...iveFromQueueWithBasicConsumeMethodTest.php | 42 ------------------- 1 file changed, 42 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index a7c12ec4d..ebd4a2ec7 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -26,48 +26,6 @@ protected function createContext() return $factory->createContext(); } -// public function test() -// { -// $context = $this->createContext(); -// -// $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume'); -// $topic->setType(AmqpTopic::TYPE_FANOUT); -// $topic->addFlag(AmqpTopic::FLAG_DURABLE); -// -// try { -// $context->deleteTopic($topic); -// } catch (\Exception $e) {} -// -// $context->declareTopic($topic); -// -// $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume'); -// -// try { -// $context->deleteQueue($queue); -// } catch (\Exception $e) {} -// -// $context->declareQueue($queue); -// $context->purgeQueue($queue); -// -// $context->bind(new AmqpBind($topic, $queue)); -// -// $consumer = $context->createConsumer($queue); -// -// // guard -// $this->assertNull($consumer->receiveNoWait()); -// -// $expectedBody = __CLASS__.time(); -// -// $context->createProducer()->send($topic, $context->createMessage($expectedBody)); -// -// $message = $consumer->receive(2000); // 2 sec -// -// $this->assertInstanceOf(PsrMessage::class, $message); -// $consumer->acknowledge($message); -// -// $this->assertSame($expectedBody, $message->getBody()); -// } - /** * {@inheritdoc} * From 07eeb44ef193266ffa8ea851674d36e422fc363c Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 13:45:32 +0300 Subject: [PATCH 32/33] tests --- ...DelayStrategyTransportFactoryTraitTest.php | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 pkg/amqp-tools/Tests/DelayStrategyTransportFactoryTraitTest.php diff --git a/pkg/amqp-tools/Tests/DelayStrategyTransportFactoryTraitTest.php b/pkg/amqp-tools/Tests/DelayStrategyTransportFactoryTraitTest.php new file mode 100644 index 000000000..879059497 --- /dev/null +++ b/pkg/amqp-tools/Tests/DelayStrategyTransportFactoryTraitTest.php @@ -0,0 +1,84 @@ +register('factoryId', DelayStrategyTransportFactoryImpl::class); + + $trait = new DelayStrategyTransportFactoryTraitImpl(); + $trait->registerDelayStrategy($container, ['delay_strategy' => 'dlx'], 'factoryId', 'name'); + + $factory = $container->getDefinition('factoryId'); + + $calls = $factory->getMethodCalls(); + + $this->assertSame('setDelayStrategy', $calls[0][0]); + $this->assertInstanceOf(Reference::class, $calls[0][1][0]); + $this->assertSame('enqueue.client.name.delay_strategy', (string) $calls[0][1][0]); + + $strategy = $container->getDefinition('enqueue.client.name.delay_strategy'); + + $this->assertSame(RabbitMqDlxDelayStrategy::class, $strategy->getClass()); + } + + public function testShouldRegisterDelayMessagePluginStrategy() + { + $container = new ContainerBuilder(); + $container->register('factoryId', DelayStrategyTransportFactoryImpl::class); + + $trait = new DelayStrategyTransportFactoryTraitImpl(); + $trait->registerDelayStrategy($container, ['delay_strategy' => 'delayed_message_plugin'], 'factoryId', 'name'); + + $factory = $container->getDefinition('factoryId'); + + $calls = $factory->getMethodCalls(); + + $this->assertSame('setDelayStrategy', $calls[0][0]); + $this->assertInstanceOf(Reference::class, $calls[0][1][0]); + $this->assertSame('enqueue.client.name.delay_strategy', (string) $calls[0][1][0]); + + $strategy = $container->getDefinition('enqueue.client.name.delay_strategy'); + + $this->assertSame(RabbitMqDelayPluginDelayStrategy::class, $strategy->getClass()); + } + + public function testShouldRegisterDelayStrategyService() + { + $container = new ContainerBuilder(); + $container->register('factoryId', DelayStrategyTransportFactoryImpl::class); + + $trait = new DelayStrategyTransportFactoryTraitImpl(); + $trait->registerDelayStrategy($container, ['delay_strategy' => 'service_name'], 'factoryId', 'name'); + + $factory = $container->getDefinition('factoryId'); + + $calls = $factory->getMethodCalls(); + + $this->assertSame('setDelayStrategy', $calls[0][0]); + $this->assertInstanceOf(Reference::class, $calls[0][1][0]); + $this->assertSame('service_name', (string) $calls[0][1][0]); + } +} + +class DelayStrategyTransportFactoryTraitImpl +{ + use DelayStrategyTransportFactoryTrait; +} + +class DelayStrategyTransportFactoryImpl implements DelayStrategyAware +{ + use DelayStrategyAwareTrait; +} From 237c10e176b91d5530a877d49a81464397e823f0 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 10 Aug 2017 13:46:48 +0300 Subject: [PATCH 33/33] tests --- ...ToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index ebd4a2ec7..0c6eb8cd2 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -37,7 +37,8 @@ protected function createQueue(PsrContext $context, $queueName) try { $context->deleteQueue($queue); - } catch (\Exception $e) {} + } catch (\Exception $e) { + } $context->declareQueue($queue); $context->purgeQueue($queue); @@ -60,7 +61,8 @@ protected function createTopic(PsrContext $context, $topicName) try { $context->deleteTopic($topic); - } catch (\Exception $e) {} + } catch (\Exception $e) { + } $context->declareTopic($topic);