diff --git a/docs/amqp_transport.md b/docs/amqp_transport.md index d777edd4e..420b22e7e 100644 --- a/docs/amqp_transport.md +++ b/docs/amqp_transport.md @@ -10,6 +10,7 @@ Build on top of [php amqp extension](https://github.com/pdezwart/php-amqp). * [Send message to topic](#send-message-to-topic) * [Send message to queue](#send-message-to-queue) * [Consume message](#consume-message) +* [Purge queue messages](#purge-queue-messages) ## Create context @@ -115,4 +116,16 @@ $consumer->acknowledge($message); // $consumer->reject($message); ``` +## Purge queue messages: + +```php +createQueue('aQueue'); + +$psrContext->purge($queue); +``` + [back to index](index.md) \ No newline at end of file diff --git a/pkg/amqp-ext/AmqpContext.php b/pkg/amqp-ext/AmqpContext.php index 9997988b5..f7bd5acf8 100644 --- a/pkg/amqp-ext/AmqpContext.php +++ b/pkg/amqp-ext/AmqpContext.php @@ -5,6 +5,7 @@ use Enqueue\Psr\Context; use Enqueue\Psr\Destination; use Enqueue\Psr\InvalidDestinationException; +use Enqueue\Psr\Queue; use Enqueue\Psr\Topic; class AmqpContext implements Context @@ -20,7 +21,7 @@ class AmqpContext implements Context private $extChannelFactory; /** - * Callable must return instance of \AMQPChannel once called + * Callable must return instance of \AMQPChannel once called. * * @param \AMQPChannel|callable $extChannel */ @@ -224,4 +225,18 @@ public function getExtChannel() return $this->extChannel; } + + /** + * Purge all messages from the given queue. + * + * @param Queue $queue + */ + public function purge(Queue $queue) + { + InvalidDestinationException::assertDestinationInstanceOf($queue, AmqpQueue::class); + + $amqpQueue = new \AMQPQueue($this->getExtChannel()); + $amqpQueue->setName($queue->getQueueName()); + $amqpQueue->purge(); + } } diff --git a/pkg/amqp-ext/Tests/AmqpContextTest.php b/pkg/amqp-ext/Tests/AmqpContextTest.php index 55b2b367c..a0e0bd32a 100644 --- a/pkg/amqp-ext/Tests/AmqpContextTest.php +++ b/pkg/amqp-ext/Tests/AmqpContextTest.php @@ -30,7 +30,7 @@ public function testCouldBeConstructedWithExtChannelAsFirstArgument() public function testCouldBeConstructedWithExtChannelCallbackFactoryAsFirstArgument() { - new AmqpContext(function() { + new AmqpContext(function () { return $this->createExtChannelMock(); }); } @@ -289,6 +289,15 @@ public function testShouldThrowIfTargetNotAmqpQueueOnBindCall() $context->bind(new AmqpTopic('aName'), new NullQueue('aName')); } + public function testShouldThrowIfGivenQueueNotAmqpQueueOnPurge() + { + $context = new AmqpContext($this->createExtChannelMock()); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\AmqpExt\AmqpQueue but got Enqueue\Transport\Null\NullQueue.'); + $context->purge(new NullQueue('aName')); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|\AMQPChannel */ diff --git a/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php b/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php index be0ef4fef..21ca06837 100644 --- a/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php +++ b/pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php @@ -170,4 +170,22 @@ public function testConsumerReceiveMessageFromTopicDirectly() $this->assertEquals(__METHOD__, $message->getBody()); } + + public function testPurgeMessagesFromQueue() + { + $queue = $this->amqpContext->createQueue('amqp_ext.test'); + $this->amqpContext->declareQueue($queue); + + $consumer = $this->amqpContext->createConsumer($queue); + + $message = $this->amqpContext->createMessage(__METHOD__); + + $producer = $this->amqpContext->createProducer(); + $producer->send($queue, $message); + $producer->send($queue, $message); + + $this->amqpContext->purge($queue); + + $this->assertNull($consumer->receive(1)); + } }