From da4dfcdc6fd97e0f5a722ee32019643aeeac67cc Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 6 Apr 2017 18:38:15 +0300 Subject: [PATCH 1/2] [consumption] Add onResult extension point. --- pkg/enqueue/Consumption/EmptyExtensionTrait.php | 7 +++++++ pkg/enqueue/Consumption/ExtensionInterface.php | 9 +++++++++ pkg/enqueue/Consumption/QueueConsumer.php | 2 ++ 3 files changed, 18 insertions(+) diff --git a/pkg/enqueue/Consumption/EmptyExtensionTrait.php b/pkg/enqueue/Consumption/EmptyExtensionTrait.php index c98ed8da3..0f6b849c4 100644 --- a/pkg/enqueue/Consumption/EmptyExtensionTrait.php +++ b/pkg/enqueue/Consumption/EmptyExtensionTrait.php @@ -25,6 +25,13 @@ public function onPreReceived(Context $context) { } + /** + * @param Context $context + */ + public function onResult(Context $context) + { + } + /** * @param Context $context */ diff --git a/pkg/enqueue/Consumption/ExtensionInterface.php b/pkg/enqueue/Consumption/ExtensionInterface.php index 5fbfe1819..2a5d7bb72 100644 --- a/pkg/enqueue/Consumption/ExtensionInterface.php +++ b/pkg/enqueue/Consumption/ExtensionInterface.php @@ -31,6 +31,15 @@ public function onBeforeReceive(Context $context); */ public function onPreReceived(Context $context); + /** + * Executed when a message is processed by a processor or a result was set in onPreReceived method. + * BUT before the message status was sent to the broker + * The consumption could be interrupted at this step but it exits after the message is processed. + * + * @param Context $context + */ + public function onResult(Context $context); + /** * Executed when a message is processed by a processor. * The context contains a status, which could not be changed. diff --git a/pkg/enqueue/Consumption/QueueConsumer.php b/pkg/enqueue/Consumption/QueueConsumer.php index e0739ff92..51820ab2e 100644 --- a/pkg/enqueue/Consumption/QueueConsumer.php +++ b/pkg/enqueue/Consumption/QueueConsumer.php @@ -204,6 +204,8 @@ protected function doConsume(ExtensionInterface $extension, Context $context) $context->setResult($result); } + $extension->onResult($context); + switch ($context->getResult()) { case Result::ACK: $consumer->acknowledge($message); From 0bc22c2dfce6a3a2e0d0755a939723ec6b14d36a Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 7 Apr 2017 13:07:53 +0300 Subject: [PATCH 2/2] [consumption] Add test for Extension::onResult method. --- pkg/enqueue/Consumption/ChainExtension.php | 10 ++ ...ensionsTest.php => ChainExtensionTest.php} | 24 +++- .../Consumption/EmptyExtensionTraitTest.php | 19 +++ .../Tests/Consumption/QueueConsumerTest.php | 116 +++++++++++++++++- 4 files changed, 166 insertions(+), 3 deletions(-) rename pkg/enqueue/Tests/Consumption/{ExtensionsTest.php => ChainExtensionTest.php} (87%) create mode 100644 pkg/enqueue/Tests/Consumption/EmptyExtensionTraitTest.php diff --git a/pkg/enqueue/Consumption/ChainExtension.php b/pkg/enqueue/Consumption/ChainExtension.php index 17f5db659..d7a24adc7 100644 --- a/pkg/enqueue/Consumption/ChainExtension.php +++ b/pkg/enqueue/Consumption/ChainExtension.php @@ -49,6 +49,16 @@ public function onPreReceived(Context $context) } } + /** + * @param Context $context + */ + public function onResult(Context $context) + { + foreach ($this->extensions as $extension) { + $extension->onResult($context); + } + } + /** * @param Context $context */ diff --git a/pkg/enqueue/Tests/Consumption/ExtensionsTest.php b/pkg/enqueue/Tests/Consumption/ChainExtensionTest.php similarity index 87% rename from pkg/enqueue/Tests/Consumption/ExtensionsTest.php rename to pkg/enqueue/Tests/Consumption/ChainExtensionTest.php index 2f35bc76f..eaca06858 100644 --- a/pkg/enqueue/Tests/Consumption/ExtensionsTest.php +++ b/pkg/enqueue/Tests/Consumption/ChainExtensionTest.php @@ -7,7 +7,7 @@ use Enqueue\Consumption\ExtensionInterface; use Enqueue\Test\ClassExtensionTrait; -class ExtensionsTest extends \PHPUnit_Framework_TestCase +class ChainExtensionTest extends \PHPUnit_Framework_TestCase { use ClassExtensionTrait; @@ -87,6 +87,28 @@ public function testShouldProxyOnPreReceiveToAllInternalExtensions() $extensions->onPreReceived($context); } + public function testShouldProxyOnResultToAllInternalExtensions() + { + $context = $this->createContextMock(); + + $fooExtension = $this->createExtension(); + $fooExtension + ->expects($this->once()) + ->method('onResult') + ->with($this->identicalTo($context)) + ; + $barExtension = $this->createExtension(); + $barExtension + ->expects($this->once()) + ->method('onResult') + ->with($this->identicalTo($context)) + ; + + $extensions = new ChainExtension([$fooExtension, $barExtension]); + + $extensions->onResult($context); + } + public function testShouldProxyOnPostReceiveToAllInternalExtensions() { $context = $this->createContextMock(); diff --git a/pkg/enqueue/Tests/Consumption/EmptyExtensionTraitTest.php b/pkg/enqueue/Tests/Consumption/EmptyExtensionTraitTest.php new file mode 100644 index 000000000..5a7f3478c --- /dev/null +++ b/pkg/enqueue/Tests/Consumption/EmptyExtensionTraitTest.php @@ -0,0 +1,19 @@ +consume(); } - public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods() + public function testShouldCallOnPreReceivedExtensionMethodWithExpectedContext() { $expectedMessage = $this->createMessageMock(); $messageConsumerStub = $this->createMessageConsumerStub($expectedMessage); @@ -497,6 +497,62 @@ public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods() $this->assertFalse($context->isExecutionInterrupted()); }) ; + + $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); + $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); + + $queueConsumer->consume(); + } + + public function testShouldCallOnResultExtensionMethodWithExpectedContext() + { + $expectedMessage = $this->createMessageMock(); + $messageConsumerStub = $this->createMessageConsumerStub($expectedMessage); + + $contextStub = $this->createPsrContextStub($messageConsumerStub); + + $processorMock = $this->createProcessorStub(); + + $extension = $this->createExtension(); + $extension + ->expects($this->once()) + ->method('onResult') + ->with($this->isInstanceOf(Context::class)) + ->willReturnCallback(function (Context $context) use ( + $contextStub, + $messageConsumerStub, + $processorMock, + $expectedMessage + ) { + $this->assertSame($contextStub, $context->getPsrContext()); + $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); + $this->assertSame($processorMock, $context->getPsrProcessor()); + $this->assertSame($expectedMessage, $context->getPsrMessage()); + $this->assertInstanceOf(NullLogger::class, $context->getLogger()); + $this->assertNull($context->getException()); + $this->assertSame(Result::ACK, $context->getResult()); + $this->assertFalse($context->isExecutionInterrupted()); + }) + ; + + $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); + $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); + + $queueConsumer->consume(); + } + + public function testShouldCallOnPostReceivedExtensionMethodWithExpectedContext() + { + $expectedMessage = $this->createMessageMock(); + $messageConsumerStub = $this->createMessageConsumerStub($expectedMessage); + + $contextStub = $this->createPsrContextStub($messageConsumerStub); + + $processorMock = $this->createProcessorStub(); + + $extension = $this->createExtension(); $extension ->expects($this->once()) ->method('onPostReceived') @@ -722,6 +778,57 @@ public function testShouldAllowInterruptConsumingOnPreReceiveButProcessCurrentMe $queueConsumer->consume(); } + public function testShouldAllowInterruptConsumingOnResult() + { + $expectedMessage = $this->createMessageMock(); + $messageConsumerStub = $this->createMessageConsumerStub($expectedMessage); + + $contextStub = $this->createPsrContextStub($messageConsumerStub); + + $processorMock = $this->createProcessorMock(); + $processorMock + ->expects($this->once()) + ->method('process') + ->willReturn(Result::ACK) + ; + + $extension = $this->createExtension(); + $extension + ->expects($this->once()) + ->method('onResult') + ->with($this->isInstanceOf(Context::class)) + ->willReturnCallback(function (Context $context) { + $context->setExecutionInterrupted(true); + }) + ; + $extension + ->expects($this->atLeastOnce()) + ->method('onInterrupted') + ->with($this->isInstanceOf(Context::class)) + ->willReturnCallback(function (Context $context) use ( + $contextStub, + $messageConsumerStub, + $processorMock, + $expectedMessage + ) { + $this->assertSame($contextStub, $context->getPsrContext()); + $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); + $this->assertSame($processorMock, $context->getPsrProcessor()); + $this->assertSame($expectedMessage, $context->getPsrMessage()); + $this->assertInstanceOf(NullLogger::class, $context->getLogger()); + $this->assertNull($context->getException()); + $this->assertSame(Result::ACK, $context->getResult()); + $this->assertTrue($context->isExecutionInterrupted()); + }) + ; + + $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); + $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); + + $queueConsumer->consume(); + } + public function testShouldAllowInterruptConsumingOnPostReceive() { $expectedMessage = $this->createMessageMock(); @@ -850,6 +957,11 @@ public function testShouldCallExtensionPassedOnRuntime() ->method('onPreReceived') ->with($this->isInstanceOf(Context::class)) ; + $runtimeExtension + ->expects($this->once()) + ->method('onResult') + ->with($this->isInstanceOf(Context::class)) + ; $runtimeExtension ->expects($this->once()) ->method('onPostReceived') @@ -936,7 +1048,7 @@ public function testShouldCallEachQueueOneByOne() }) ; $extension - ->expects($this->at(4)) + ->expects($this->at(5)) ->method('onBeforeReceive') ->with($this->isInstanceOf(Context::class)) ->willReturnCallback(function (Context $context) use ($anotherProcessorMock, $queue2) {