diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 451898eb0..0fab760d0 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -29,6 +29,14 @@ pkg/amqp-bunny/Tests + + pkg/amqp-lib/Tests + + + + pkg/amqp-tools/Tests + + pkg/fs/Tests diff --git a/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php b/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php index 20de33382..95341671f 100644 --- a/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php +++ b/pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php @@ -2,6 +2,7 @@ namespace Enqueue\AmqpBunny\Symfony; +use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait; use Enqueue\Client\Amqp\RabbitMqDriver; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -10,6 +11,8 @@ class RabbitMqAmqpBunnyTransportFactory extends AmqpBunnyTransportFactory { + use DelayStrategyTransportFactoryTrait; + /** * @param string $name */ @@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder) $builder ->children() - ->booleanNode('delay_plugin_installed') - ->defaultFalse() - ->info('The option tells whether RabbitMQ broker has delay plugin installed or not') + ->scalarNode('delay_strategy') + ->defaultValue('dlx') + ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') ->end() ; } + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factoryId = parent::createConnectionFactory($container, $config); + + $this->registerDelayStrategy($container, $config, $factoryId, $this->getName()); + + return $factoryId; + } + /** * {@inheritdoc} */ diff --git a/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php b/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php index 4dd11871b..555ad777e 100644 --- a/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php +++ b/pkg/amqp-bunny/Tests/Symfony/RabbitMqAmqpBunnyTransportFactoryTest.php @@ -58,7 +58,7 @@ public function testShouldAllowAddConfiguration() 'user' => 'guest', 'pass' => 'guest', 'vhost' => '/', - 'delay_plugin_installed' => false, + 'delay_strategy' => 'dlx', 'lazy' => true, 'receive_method' => 'basic_get', 'heartbeat' => 0, @@ -78,7 +78,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertTrue($container->hasDefinition($serviceId)); @@ -91,7 +91,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]], $factory->getArguments()); } @@ -108,7 +108,7 @@ public function testShouldCreateContext() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertEquals('enqueue.transport.rabbitmq_amqp_bunny.context', $serviceId); diff --git a/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php b/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php index 11ef70342..8ab200e14 100644 --- a/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php +++ b/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php @@ -2,6 +2,7 @@ namespace Enqueue\AmqpExt\Symfony; +use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait; use Enqueue\Client\Amqp\RabbitMqDriver; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -10,6 +11,8 @@ class RabbitMqAmqpTransportFactory extends AmqpTransportFactory { + use DelayStrategyTransportFactoryTrait; + /** * @param string $name */ @@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder) $builder ->children() - ->booleanNode('delay_plugin_installed') - ->defaultFalse() - ->info('The option tells whether RabbitMQ broker has delay plugin installed or not') + ->scalarNode('delay_strategy') + ->defaultValue('dlx') + ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') ->end() ; } + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factoryId = parent::createConnectionFactory($container, $config); + + $this->registerDelayStrategy($container, $config, $factoryId, $this->getName()); + + return $factoryId; + } + /** * {@inheritdoc} */ diff --git a/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php b/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php index 46f530042..31853492a 100644 --- a/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php +++ b/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php @@ -59,7 +59,7 @@ public function testShouldAllowAddConfiguration() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => 'dlx', 'lazy' => true, 'receive_method' => 'basic_get', ], $config); @@ -78,7 +78,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertTrue($container->hasDefinition($serviceId)); @@ -91,7 +91,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]], $factory->getArguments()); } @@ -108,7 +108,7 @@ public function testShouldCreateContext() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertEquals('enqueue.transport.rabbitmq_amqp.context', $serviceId); diff --git a/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php b/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php index 20765c6b1..43a88a2da 100644 --- a/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php +++ b/pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php @@ -2,6 +2,7 @@ namespace Enqueue\AmqpLib\Symfony; +use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait; use Enqueue\Client\Amqp\RabbitMqDriver; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -10,6 +11,8 @@ class RabbitMqAmqpLibTransportFactory extends AmqpLibTransportFactory { + use DelayStrategyTransportFactoryTrait; + /** * @param string $name */ @@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder) $builder ->children() - ->booleanNode('delay_plugin_installed') - ->defaultFalse() - ->info('The option tells whether RabbitMQ broker has delay plugin installed or not') + ->scalarNode('delay_strategy') + ->defaultValue('dlx') + ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') ->end() ; } + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factoryId = parent::createConnectionFactory($container, $config); + + $this->registerDelayStrategy($container, $config, $factoryId, $this->getName()); + + return $factoryId; + } + /** * {@inheritdoc} */ diff --git a/pkg/amqp-lib/Tests/AmqpConsumerTest.php b/pkg/amqp-lib/Tests/AmqpConsumerTest.php index f4462e2ad..21585734c 100644 --- a/pkg/amqp-lib/Tests/AmqpConsumerTest.php +++ b/pkg/amqp-lib/Tests/AmqpConsumerTest.php @@ -102,6 +102,7 @@ public function testShouldReturnMessageOnReceiveNoWait() $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); $amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag'; $amqpMessage->delivery_info['redelivered'] = true; + $amqpMessage->delivery_info['routing_key'] = 'routing-key'; $channel = $this->createChannelMock(); $channel @@ -120,6 +121,7 @@ public function testShouldReturnMessageOnReceiveNoWait() $this->assertInstanceOf(AmqpMessage::class, $message); $this->assertSame('body', $message->getBody()); $this->assertSame('delivery-tag', $message->getDeliveryTag()); + $this->assertSame('routing-key', $message->getRoutingKey()); $this->assertTrue($message->isRedelivered()); } @@ -127,6 +129,7 @@ public function testShouldReturnMessageOnReceiveWithReceiveMethodBasicGet() { $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); $amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag'; + $amqpMessage->delivery_info['routing_key'] = 'routing-key'; $amqpMessage->delivery_info['redelivered'] = true; $channel = $this->createChannelMock(); @@ -146,6 +149,7 @@ public function testShouldReturnMessageOnReceiveWithReceiveMethodBasicGet() $this->assertInstanceOf(AmqpMessage::class, $message); $this->assertSame('body', $message->getBody()); $this->assertSame('delivery-tag', $message->getDeliveryTag()); + $this->assertSame('routing-key', $message->getRoutingKey()); $this->assertTrue($message->isRedelivered()); } diff --git a/pkg/amqp-lib/Tests/AmqpContextTest.php b/pkg/amqp-lib/Tests/AmqpContextTest.php index 80dee492a..be52235e7 100644 --- a/pkg/amqp-lib/Tests/AmqpContextTest.php +++ b/pkg/amqp-lib/Tests/AmqpContextTest.php @@ -8,6 +8,7 @@ use Interop\Amqp\Impl\AmqpTopic; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; +use PhpAmqpLib\Wire\AMQPTable; use PHPUnit\Framework\TestCase; class AmqpContextTest extends TestCase @@ -26,7 +27,7 @@ public function testShouldDeclareTopic() $this->isTrue(), $this->isTrue(), $this->isTrue(), - $this->identicalTo(['key' => 'value']), + $this->isInstanceOf(AMQPTable::class), $this->isNull() ) ; @@ -94,7 +95,7 @@ public function testShouldDeclareQueue() $this->isTrue(), $this->isTrue(), $this->isTrue(), - $this->identicalTo(['key' => 'value']), + $this->isInstanceOf(AMQPTable::class), $this->isNull() ) ; diff --git a/pkg/amqp-lib/Tests/AmqpProducerTest.php b/pkg/amqp-lib/Tests/AmqpProducerTest.php index 1d389bf3c..8ccf419ac 100644 --- a/pkg/amqp-lib/Tests/AmqpProducerTest.php +++ b/pkg/amqp-lib/Tests/AmqpProducerTest.php @@ -2,6 +2,7 @@ namespace Enqueue\AmqpLib\Tests; +use Enqueue\AmqpLib\AmqpContext; use Enqueue\AmqpLib\AmqpProducer; use Enqueue\Test\ClassExtensionTrait; use Interop\Amqp\Impl\AmqpMessage; @@ -23,7 +24,7 @@ class AmqpProducerTest extends TestCase public function testCouldBeConstructedWithRequiredArguments() { - new AmqpProducer($this->createAmqpChannelMock()); + new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock()); } public function testShouldImplementPsrProducerInterface() @@ -33,7 +34,7 @@ public function testShouldImplementPsrProducerInterface() public function testShouldThrowExceptionWhenDestinationTypeIsInvalid() { - $producer = new AmqpProducer($this->createAmqpChannelMock()); + $producer = new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock()); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Interop\Amqp\AmqpQueue but got'); @@ -43,7 +44,7 @@ public function testShouldThrowExceptionWhenDestinationTypeIsInvalid() public function testShouldThrowExceptionWhenMessageTypeIsInvalid() { - $producer = new AmqpProducer($this->createAmqpChannelMock()); + $producer = new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock()); $this->expectException(InvalidMessageException::class); $this->expectExceptionMessage('The message must be an instance of Interop\Amqp\AmqpMessage but it is'); @@ -70,7 +71,7 @@ public function testShouldPublishMessageToTopic() $message = new AmqpMessage('body'); $message->setRoutingKey('routing-key'); - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send($topic, $message); $this->assertEquals('body', $amqpMessage->getBody()); @@ -92,7 +93,7 @@ public function testShouldPublishMessageToQueue() $queue = new AmqpQueue('queue'); - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send($queue, new AmqpMessage('body')); $this->assertEquals('body', $amqpMessage->getBody()); @@ -111,7 +112,7 @@ public function testShouldSetMessageHeaders() })) ; - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send(new AmqpTopic('name'), new AmqpMessage('body', [], ['content_type' => 'text/plain'])); $this->assertEquals(['content_type' => 'text/plain'], $amqpMessage->get_properties()); @@ -130,7 +131,7 @@ public function testShouldSetMessageProperties() })) ; - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send(new AmqpTopic('name'), new AmqpMessage('body', ['key' => 'value'])); $properties = $amqpMessage->get_properties(); @@ -163,4 +164,12 @@ private function createAmqpChannelMock() { return $this->createMock(AMQPChannel::class); } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext + */ + private function createContextMock() + { + return $this->createMock(AmqpContext::class); + } } diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php index 5947a7cd1..0c6eb8cd2 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -3,7 +3,7 @@ namespace Enqueue\AmqpLib\Tests\Spec; use Enqueue\AmqpLib\AmqpConnectionFactory; -use Enqueue\AmqpLib\AmqpContext; +use Interop\Amqp\AmqpContext; use Interop\Amqp\AmqpTopic; use Interop\Amqp\Impl\AmqpBind; use Interop\Queue\PsrContext; @@ -14,6 +14,8 @@ */ class AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest extends SendToTopicAndReceiveFromQueueSpec { + private $topic; + /** * {@inheritdoc} */ @@ -31,13 +33,17 @@ protected function createContext() */ protected function createQueue(PsrContext $context, $queueName) { - $queueName .= '_basic_consume'; + $queue = $context->createQueue('send_to_topic_and_receive_from_queue_spec_basic_consume'); + + try { + $context->deleteQueue($queue); + } catch (\Exception $e) { + } - $queue = $context->createQueue($queueName); $context->declareQueue($queue); $context->purgeQueue($queue); - $context->bind(new AmqpBind($context->createTopic($queueName), $queue)); + $context->bind(new AmqpBind($this->topic, $queue)); return $queue; } @@ -49,13 +55,17 @@ protected function createQueue(PsrContext $context, $queueName) */ protected function createTopic(PsrContext $context, $topicName) { - $topicName .= '_basic_consume'; - - $topic = $context->createTopic($topicName); + $topic = $context->createTopic('send_to_topic_and_receive_from_queue_spec_basic_consume'); $topic->setType(AmqpTopic::TYPE_FANOUT); $topic->addFlag(AmqpTopic::FLAG_DURABLE); + + try { + $context->deleteTopic($topic); + } catch (\Exception $e) { + } + $context->declareTopic($topic); - return $topic; + return $this->topic = $topic; } } diff --git a/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php b/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php index 22b1b4354..b86a57bdf 100644 --- a/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php +++ b/pkg/amqp-lib/Tests/Symfony/RabbitMqAmqpLibTransportFactoryTest.php @@ -58,7 +58,7 @@ public function testShouldAllowAddConfiguration() 'user' => 'guest', 'pass' => 'guest', 'vhost' => '/', - 'delay_plugin_installed' => false, + 'delay_strategy' => 'dlx', 'lazy' => true, 'receive_method' => 'basic_get', 'connection_timeout' => 3.0, @@ -85,7 +85,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertTrue($container->hasDefinition($serviceId)); @@ -98,7 +98,7 @@ public function testShouldCreateConnectionFactory() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]], $factory->getArguments()); } @@ -115,7 +115,7 @@ public function testShouldCreateContext() 'pass' => 'guest', 'vhost' => '/', 'persisted' => false, - 'delay_plugin_installed' => false, + 'delay_strategy' => null, ]); $this->assertEquals('enqueue.transport.rabbitmq_amqp_lib.context', $serviceId); diff --git a/pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php b/pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php new file mode 100644 index 000000000..dbcb71f5a --- /dev/null +++ b/pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php @@ -0,0 +1,37 @@ +getDefinition($factoryId); + + if (false == is_a($factory->getClass(), DelayStrategyAware::class, true)) { + throw new \LogicException('Connection factory does not support delays'); + } + + if (strtolower($config['delay_strategy']) === 'dlx') { + $delayId = sprintf('enqueue.client.%s.delay_strategy', $factoryName); + $container->register($delayId, RabbitMqDlxDelayStrategy::class); + + $factory->addMethodCall('setDelayStrategy', [new Reference($delayId)]); + } elseif (strtolower($config['delay_strategy']) === 'delayed_message_plugin') { + $delayId = sprintf('enqueue.client.%s.delay_strategy', $factoryName); + $container->register($delayId, RabbitMqDelayPluginDelayStrategy::class); + + $factory->addMethodCall('setDelayStrategy', [new Reference($delayId)]); + } else { + $factory->addMethodCall('setDelayStrategy', [new Reference($config['delay_strategy'])]); + } + } + } +} diff --git a/pkg/amqp-tools/Tests/DelayStrategyTransportFactoryTraitTest.php b/pkg/amqp-tools/Tests/DelayStrategyTransportFactoryTraitTest.php new file mode 100644 index 000000000..879059497 --- /dev/null +++ b/pkg/amqp-tools/Tests/DelayStrategyTransportFactoryTraitTest.php @@ -0,0 +1,84 @@ +register('factoryId', DelayStrategyTransportFactoryImpl::class); + + $trait = new DelayStrategyTransportFactoryTraitImpl(); + $trait->registerDelayStrategy($container, ['delay_strategy' => 'dlx'], 'factoryId', 'name'); + + $factory = $container->getDefinition('factoryId'); + + $calls = $factory->getMethodCalls(); + + $this->assertSame('setDelayStrategy', $calls[0][0]); + $this->assertInstanceOf(Reference::class, $calls[0][1][0]); + $this->assertSame('enqueue.client.name.delay_strategy', (string) $calls[0][1][0]); + + $strategy = $container->getDefinition('enqueue.client.name.delay_strategy'); + + $this->assertSame(RabbitMqDlxDelayStrategy::class, $strategy->getClass()); + } + + public function testShouldRegisterDelayMessagePluginStrategy() + { + $container = new ContainerBuilder(); + $container->register('factoryId', DelayStrategyTransportFactoryImpl::class); + + $trait = new DelayStrategyTransportFactoryTraitImpl(); + $trait->registerDelayStrategy($container, ['delay_strategy' => 'delayed_message_plugin'], 'factoryId', 'name'); + + $factory = $container->getDefinition('factoryId'); + + $calls = $factory->getMethodCalls(); + + $this->assertSame('setDelayStrategy', $calls[0][0]); + $this->assertInstanceOf(Reference::class, $calls[0][1][0]); + $this->assertSame('enqueue.client.name.delay_strategy', (string) $calls[0][1][0]); + + $strategy = $container->getDefinition('enqueue.client.name.delay_strategy'); + + $this->assertSame(RabbitMqDelayPluginDelayStrategy::class, $strategy->getClass()); + } + + public function testShouldRegisterDelayStrategyService() + { + $container = new ContainerBuilder(); + $container->register('factoryId', DelayStrategyTransportFactoryImpl::class); + + $trait = new DelayStrategyTransportFactoryTraitImpl(); + $trait->registerDelayStrategy($container, ['delay_strategy' => 'service_name'], 'factoryId', 'name'); + + $factory = $container->getDefinition('factoryId'); + + $calls = $factory->getMethodCalls(); + + $this->assertSame('setDelayStrategy', $calls[0][0]); + $this->assertInstanceOf(Reference::class, $calls[0][1][0]); + $this->assertSame('service_name', (string) $calls[0][1][0]); + } +} + +class DelayStrategyTransportFactoryTraitImpl +{ + use DelayStrategyTransportFactoryTrait; +} + +class DelayStrategyTransportFactoryImpl implements DelayStrategyAware +{ + use DelayStrategyAwareTrait; +} diff --git a/pkg/enqueue/Client/Amqp/RabbitMqDriver.php b/pkg/enqueue/Client/Amqp/RabbitMqDriver.php index e65095e87..518b90056 100644 --- a/pkg/enqueue/Client/Amqp/RabbitMqDriver.php +++ b/pkg/enqueue/Client/Amqp/RabbitMqDriver.php @@ -10,11 +10,7 @@ use Interop\Amqp\AmqpContext; use Interop\Amqp\AmqpMessage; use Interop\Amqp\AmqpQueue; -use Interop\Amqp\AmqpTopic; -use Interop\Amqp\Impl\AmqpBind; use Interop\Queue\PsrMessage; -use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; class RabbitMqDriver extends AmqpDriver { @@ -75,12 +71,13 @@ public function sendToProcessor(Message $message) $transportMessage = $this->createTransportMessage($message); $destination = $this->createQueue($queueName); + $producer = $this->context->createProducer(); if ($message->getDelay()) { - $destination = $this->createDelayedTopic($destination); + $producer->setDeliveryDelay($message->getDelay() * 1000); } - $this->context->createProducer()->send($destination, $transportMessage); + $producer->send($destination, $transportMessage); } /** @@ -117,11 +114,11 @@ public function createTransportMessage(Message $message) } if ($message->getDelay()) { - if (false == $this->config->getTransportOption('delay_plugin_installed', false)) { - throw new LogicException('The message delaying is not supported. In order to use delay feature install RabbitMQ delay plugin.'); + if (false == $this->config->getTransportOption('delay_strategy', false)) { + throw new LogicException('The message delaying is not supported. In order to use delay feature install RabbitMQ delay strategy.'); } - $transportMessage->setProperty('x-delay', (string) ($message->getDelay() * 1000)); + $transportMessage->setProperty('enqueue-delay', $message->getDelay() * 1000); } return $transportMessage; @@ -144,9 +141,9 @@ public function createClientMessage(PsrMessage $message) $clientMessage->setPriority($clientPriority); } - if ($delay = $message->getProperty('x-delay')) { + if ($delay = $message->getProperty('enqueue-delay')) { if (false == is_numeric($delay)) { - throw new \LogicException(sprintf('x-delay header is not numeric. "%s"', $delay)); + throw new \LogicException(sprintf('"enqueue-delay" header is not numeric. "%s"', $delay)); } $clientMessage->setDelay((int) ((int) $delay) / 1000); @@ -154,53 +151,4 @@ public function createClientMessage(PsrMessage $message) return $clientMessage; } - - /** - * {@inheritdoc} - */ - public function setupBroker(LoggerInterface $logger = null) - { - $logger = $logger ?: new NullLogger(); - - parent::setupBroker($logger); - - $log = function ($text, ...$args) use ($logger) { - $logger->debug(sprintf('[RabbitMqDriver] '.$text, ...$args)); - }; - - // setup delay exchanges - if ($this->config->getTransportOption('delay_plugin_installed', false)) { - foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) { - $queue = $this->createQueue($meta->getClientName()); - - $delayTopic = $this->createDelayedTopic($queue); - - $log('Declare delay exchange: %s', $delayTopic->getTopicName()); - $this->context->declareTopic($delayTopic); - - $log('Bind processor queue to delay exchange: %s -> %s', $queue->getQueueName(), $delayTopic->getTopicName()); - $this->context->bind(new AmqpBind($delayTopic, $queue, $queue->getQueueName())); - } - } - } - - /** - * @param AmqpQueue $queue - * - * @return AmqpTopic - */ - private function createDelayedTopic(AmqpQueue $queue) - { - $queueName = $queue->getQueueName(); - - // in order to use delay feature make sure the rabbitmq_delayed_message_exchange plugin is installed. - $delayTopic = $this->context->createTopic($queueName.'.delayed'); - $delayTopic->setType('x-delayed-message'); - $delayTopic->addFlag(AmqpTopic::FLAG_DURABLE); - $delayTopic->setArguments([ - 'x-delayed-type' => 'direct', - ]); - - return $delayTopic; - } } diff --git a/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php b/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php index 698fc5c3e..3b67ef52e 100644 --- a/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php +++ b/pkg/enqueue/Tests/Client/Amqp/RabbitMqDriverTest.php @@ -97,7 +97,7 @@ public function testShouldConvertTransportMessageToClientMessage() $transportMessage->setBody('body'); $transportMessage->setHeaders(['hkey' => 'hval']); $transportMessage->setProperties(['key' => 'val']); - $transportMessage->setProperty('x-delay', '5678000'); + $transportMessage->setProperty('enqueue-delay', '5678000'); $transportMessage->setHeader('content_type', 'ContentType'); $transportMessage->setHeader('expiration', '12345000'); $transportMessage->setHeader('priority', 3); @@ -108,7 +108,7 @@ public function testShouldConvertTransportMessageToClientMessage() $driver = new RabbitMqDriver( $this->createAmqpContextMock(), - new Config('', '', '', '', '', '', ['delay_plugin_installed' => true]), + new Config('', '', '', '', '', '', ['delay_strategy' => 'dlx']), $this->createDummyQueueMetaRegistry() ); @@ -128,7 +128,7 @@ public function testShouldConvertTransportMessageToClientMessage() ], $clientMessage->getHeaders()); $this->assertSame([ 'key' => 'val', - 'x-delay' => '5678000', + 'enqueue-delay' => '5678000', ], $clientMessage->getProperties()); $this->assertSame('MessageId', $clientMessage->getMessageId()); $this->assertSame(12345, $clientMessage->getExpire()); @@ -143,7 +143,7 @@ public function testShouldConvertTransportMessageToClientMessage() public function testShouldThrowExceptionIfXDelayIsNotNumeric() { $transportMessage = new AmqpMessage(); - $transportMessage->setProperty('x-delay', 'is-not-numeric'); + $transportMessage->setProperty('enqueue-delay', 'is-not-numeric'); $driver = new RabbitMqDriver( $this->createAmqpContextMock(), @@ -152,7 +152,7 @@ public function testShouldThrowExceptionIfXDelayIsNotNumeric() ); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('x-delay header is not numeric. "is-not-numeric"'); + $this->expectExceptionMessage('"enqueue-delay" header is not numeric. "is-not-numeric"'); $driver->createClientMessage($transportMessage); } @@ -239,7 +239,7 @@ public function testShouldConvertClientMessageToTransportMessage() $driver = new RabbitMqDriver( $context, - new Config('', '', '', '', '', '', ['delay_plugin_installed' => true]), + new Config('', '', '', '', '', '', ['delay_strategy' => 'dlx']), $this->createDummyQueueMetaRegistry() ); @@ -260,7 +260,7 @@ public function testShouldConvertClientMessageToTransportMessage() ], $transportMessage->getHeaders()); $this->assertSame([ 'key' => 'val', - 'x-delay' => '432000', + 'enqueue-delay' => 432000, ], $transportMessage->getProperties()); $this->assertSame('MessageId', $transportMessage->getMessageId()); $this->assertSame(1000, $transportMessage->getTimestamp()); @@ -282,12 +282,12 @@ public function testThrowIfDelayNotSupportedOnConvertClientMessageToTransportMes $driver = new RabbitMqDriver( $context, - new Config('', '', '', '', '', '', ['delay_plugin_installed' => false]), + new Config('', '', '', '', '', '', ['delay_strategy' => null]), $this->createDummyQueueMetaRegistry() ); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The message delaying is not supported. In order to use delay feature install RabbitMQ delay plugin.'); + $this->expectExceptionMessage('The message delaying is not supported. In order to use delay feature install RabbitMQ delay strategy.'); $driver->createTransportMessage($clientMessage); } @@ -386,17 +386,21 @@ public function testShouldSendMessageToProcessor() $driver->sendToProcessor($message); } - public function testShouldSendMessageToDelayExchangeIfDelaySet() + public function testShouldSendMessageToProcessorWithDeliveryDelay() { $queue = new AmqpQueue(''); - $delayTopic = new AmqpTopic(''); $transportMessage = new AmqpMessage(); $producer = $this->createAmqpProducerMock(); $producer ->expects($this->once()) ->method('send') - ->with($this->identicalTo($delayTopic), $this->identicalTo($transportMessage)) + ->with($this->identicalTo($queue), $this->identicalTo($transportMessage)) + ; + $producer + ->expects($this->once()) + ->method('setDeliveryDelay') + ->with($this->identicalTo(10000)) ; $context = $this->createAmqpContextMock(); $context @@ -404,11 +408,6 @@ public function testShouldSendMessageToDelayExchangeIfDelaySet() ->method('createQueue') ->willReturn($queue) ; - $context - ->expects($this->once()) - ->method('createTopic') - ->willReturn($delayTopic) - ; $context ->expects($this->once()) ->method('createProducer') @@ -422,7 +421,7 @@ public function testShouldSendMessageToDelayExchangeIfDelaySet() $driver = new RabbitMqDriver( $context, - new Config('', '', '', '', '', '', ['delay_plugin_installed' => true]), + new Config('', '', '', '', '', '', ['delay_strategy' => 'dlx']), $this->createDummyQueueMetaRegistry() ); @@ -506,7 +505,7 @@ public function testShouldSetupBrokerWhenDelayPluginNotInstalled() ->willReturn($processorQueue) ; - $config = Config::create('', '', '', '', '', '', ['delay_plugin_installed' => false]); + $config = Config::create('', '', '', '', '', '', ['delay_strategy' => null]); $meta = new QueueMetaRegistry($config, ['default' => []]); @@ -521,7 +520,6 @@ public function testShouldSetupBroker() $routerQueue = new AmqpQueue(''); $processorQueue = new AmqpQueue(''); - $delayTopic = new AmqpTopic(''); $context = $this->createAmqpContextMock(); // setup router @@ -561,29 +559,8 @@ public function testShouldSetupBroker() ->method('declareQueue') ->with($this->identicalTo($processorQueue)) ; - $context - ->expects($this->at(7)) - ->method('createQueue') - ->willReturn($processorQueue) - ; - $context - ->expects($this->at(8)) - ->method('createTopic') - ->willReturn($delayTopic) - ; - $context - ->expects($this->at(9)) - ->method('declareTopic') - ->with($this->identicalTo($delayTopic)) - ; - - $context - ->expects($this->at(10)) - ->method('bind') - ->with($this->isInstanceOf(AmqpBind::class)) - ; - $config = Config::create('', '', '', '', '', '', ['delay_plugin_installed' => true]); + $config = Config::create('', '', '', '', '', '', ['delay_strategy' => 'dlx']); $meta = new QueueMetaRegistry($config, ['default' => []]); diff --git a/pkg/sqs/SqsConsumer.php b/pkg/sqs/SqsConsumer.php index 700b5c330..4ec4abdf5 100644 --- a/pkg/sqs/SqsConsumer.php +++ b/pkg/sqs/SqsConsumer.php @@ -98,7 +98,21 @@ public function getQueue() */ public function receive($timeout = 0) { - $timeout /= 1000; + $maxLongPollingTime = 20; // 20 is max allowed long polling value + + if ($timeout === 0) { + while (true) { + if ($message = $this->receiveMessage($maxLongPollingTime)) { + return $message; + } + } + } + + $timeout = (int) ceil($timeout / 1000); + + if ($timeout > $maxLongPollingTime) { + throw new \LogicException(sprintf('Max allowed SQS receive message timeout is: "%s"', $maxLongPollingTime)); + } return $this->receiveMessage($timeout); } diff --git a/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php index 9fd3e96b8..8c1391915 100644 --- a/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php @@ -4,6 +4,7 @@ use Enqueue\Sqs\SqsConnectionFactory; use Enqueue\Sqs\SqsContext; +use Enqueue\Sqs\SqsDestination; use Enqueue\Test\RetryTrait; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -16,6 +17,25 @@ class SqsSendAndReceiveDelayedMessageFromQueueTest extends SendAndReceiveDelayed { use RetryTrait; + /** + * @var SqsContext + */ + private $context; + + /** + * @var SqsDestination + */ + private $queue; + + protected function tearDown() + { + parent::tearDown(); + + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } + } + /** * {@inheritdoc} */ @@ -27,7 +47,7 @@ protected function createContext() 'region' => getenv('AWS__SQS__REGION'), ]); - return $factory->createContext(); + return $this->context = $factory->createContext(); } /** @@ -39,9 +59,9 @@ protected function createQueue(PsrContext $context, $queueName) { $queueName = $queueName.time(); - $queue = $context->createQueue($queueName); - $context->declareQueue($queue); + $this->queue = $context->createQueue($queueName); + $context->declareQueue($this->queue); - return $queue; + return $this->queue; } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php index 3e73cd489..9bfb753f4 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php @@ -4,6 +4,7 @@ use Enqueue\Sqs\SqsConnectionFactory; use Enqueue\Sqs\SqsContext; +use Enqueue\Sqs\SqsDestination; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec; @@ -12,6 +13,25 @@ */ class SqsSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec { + /** + * @var SqsContext + */ + private $context; + + /** + * @var SqsDestination + */ + private $queue; + + protected function tearDown() + { + parent::tearDown(); + + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } + } + /** * {@inheritdoc} */ @@ -23,7 +43,7 @@ protected function createContext() 'region' => getenv('AWS__SQS__REGION'), ]); - return $factory->createContext(); + return $this->context = $factory->createContext(); } /** @@ -35,9 +55,9 @@ protected function createQueue(PsrContext $context, $queueName) { $queueName = $queueName.time(); - $queue = $context->createQueue($queueName); - $context->declareQueue($queue); + $this->queue = $context->createQueue($queueName); + $context->declareQueue($this->queue); - return $queue; + return $this->queue; } } diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php index 5c4595e88..cb611d6df 100644 --- a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php @@ -4,6 +4,7 @@ use Enqueue\Sqs\SqsConnectionFactory; use Enqueue\Sqs\SqsContext; +use Enqueue\Sqs\SqsDestination; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendToAndReceiveFromTopicSpec; @@ -12,6 +13,25 @@ */ class SqsSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec { + /** + * @var SqsContext + */ + private $context; + + /** + * @var SqsDestination + */ + private $queue; + + protected function tearDown() + { + parent::tearDown(); + + if ($this->context && $this->queue) { + $this->context->deleteQueue($this->queue); + } + } + /** * {@inheritdoc} */ @@ -23,7 +43,7 @@ protected function createContext() 'region' => getenv('AWS__SQS__REGION'), ]); - return $factory->createContext(); + return $this->context = $factory->createContext(); } /** @@ -35,9 +55,9 @@ protected function createTopic(PsrContext $context, $topicName) { $topicName = $topicName.time(); - $topic = $context->createTopic($topicName); - $context->declareQueue($topic); + $this->queue = $context->createTopic($topicName); + $context->declareQueue($this->queue); - return $topic; + return $this->queue; } } diff --git a/pkg/test/RetryTrait.php b/pkg/test/RetryTrait.php index cc8c377e4..de17565bd 100644 --- a/pkg/test/RetryTrait.php +++ b/pkg/test/RetryTrait.php @@ -9,6 +9,14 @@ public function runBare() $e = null; $numberOfRetires = $this->getNumberOfRetries(); + if (false == is_numeric($numberOfRetires)) { + throw new \LogicException(sprintf('The $numberOfRetires must be a number but got "%s"', var_export($numberOfRetires, true))); + } + $numberOfRetires = (int) $numberOfRetires; + if ($numberOfRetires <= 0) { + throw new \LogicException(sprintf('The $numberOfRetires must be a positive number greater than 0 but got "%s".', $numberOfRetires)); + } + for ($i = 0; $i < $numberOfRetires; ++$i) { try { parent::runBare(); @@ -37,8 +45,8 @@ private function getNumberOfRetries() { $annotations = $this->getAnnotations(); - if (isset($annotations['method']['retry'])) { - return $annotations['method']['retry']; + if (isset($annotations['method']['retry'][0])) { + return $annotations['method']['retry'][0]; } if (isset($annotations['class']['retry'][0])) {