Skip to content

[consumption] Add onResult extension point. #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/enqueue/Consumption/ChainExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public function onPreReceived(Context $context)
}
}

/**
* @param Context $context
*/
public function onResult(Context $context)
{
foreach ($this->extensions as $extension) {
$extension->onResult($context);
}
}

/**
* @param Context $context
*/
Expand Down
7 changes: 7 additions & 0 deletions pkg/enqueue/Consumption/EmptyExtensionTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ public function onPreReceived(Context $context)
{
}

/**
* @param Context $context
*/
public function onResult(Context $context)
{
}

/**
* @param Context $context
*/
Expand Down
9 changes: 9 additions & 0 deletions pkg/enqueue/Consumption/ExtensionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ public function onBeforeReceive(Context $context);
*/
public function onPreReceived(Context $context);

/**
* Executed when a message is processed by a processor or a result was set in onPreReceived method.
* BUT before the message status was sent to the broker
* The consumption could be interrupted at this step but it exits after the message is processed.
*
* @param Context $context
*/
public function onResult(Context $context);

/**
* Executed when a message is processed by a processor.
* The context contains a status, which could not be changed.
Expand Down
2 changes: 2 additions & 0 deletions pkg/enqueue/Consumption/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
$context->setResult($result);
}

$extension->onResult($context);

switch ($context->getResult()) {
case Result::ACK:
$consumer->acknowledge($message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Enqueue\Consumption\ExtensionInterface;
use Enqueue\Test\ClassExtensionTrait;

class ExtensionsTest extends \PHPUnit_Framework_TestCase
class ChainExtensionTest extends \PHPUnit_Framework_TestCase
{
use ClassExtensionTrait;

Expand Down Expand Up @@ -87,6 +87,28 @@ public function testShouldProxyOnPreReceiveToAllInternalExtensions()
$extensions->onPreReceived($context);
}

public function testShouldProxyOnResultToAllInternalExtensions()
{
$context = $this->createContextMock();

$fooExtension = $this->createExtension();
$fooExtension
->expects($this->once())
->method('onResult')
->with($this->identicalTo($context))
;
$barExtension = $this->createExtension();
$barExtension
->expects($this->once())
->method('onResult')
->with($this->identicalTo($context))
;

$extensions = new ChainExtension([$fooExtension, $barExtension]);

$extensions->onResult($context);
}

public function testShouldProxyOnPostReceiveToAllInternalExtensions()
{
$context = $this->createContextMock();
Expand Down
19 changes: 19 additions & 0 deletions pkg/enqueue/Tests/Consumption/EmptyExtensionTraitTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php
namespace Enqueue\Tests\Consumption;

use Enqueue\Consumption\EmptyExtensionTrait;
use Enqueue\Consumption\ExtensionInterface;
use PHPUnit\Framework\TestCase;

class EmptyExtensionTraitTest extends TestCase
{
public function testTraitMustImplementOrExtensionMethods()
{
new EmptyExtension();
}
}

class EmptyExtension implements ExtensionInterface
{
use EmptyExtensionTrait;
}
116 changes: 114 additions & 2 deletions pkg/enqueue/Tests/Consumption/QueueConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ public function testShouldCallOnBeforeReceiveExtensionMethod()
$queueConsumer->consume();
}

public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods()
public function testShouldCallOnPreReceivedExtensionMethodWithExpectedContext()
{
$expectedMessage = $this->createMessageMock();
$messageConsumerStub = $this->createMessageConsumerStub($expectedMessage);
Expand Down Expand Up @@ -497,6 +497,62 @@ public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods()
$this->assertFalse($context->isExecutionInterrupted());
})
;

$chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]);
$queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0);
$queueConsumer->bind(new NullQueue('aQueueName'), $processorMock);

$queueConsumer->consume();
}

public function testShouldCallOnResultExtensionMethodWithExpectedContext()
{
$expectedMessage = $this->createMessageMock();
$messageConsumerStub = $this->createMessageConsumerStub($expectedMessage);

$contextStub = $this->createPsrContextStub($messageConsumerStub);

$processorMock = $this->createProcessorStub();

$extension = $this->createExtension();
$extension
->expects($this->once())
->method('onResult')
->with($this->isInstanceOf(Context::class))
->willReturnCallback(function (Context $context) use (
$contextStub,
$messageConsumerStub,
$processorMock,
$expectedMessage
) {
$this->assertSame($contextStub, $context->getPsrContext());
$this->assertSame($messageConsumerStub, $context->getPsrConsumer());
$this->assertSame($processorMock, $context->getPsrProcessor());
$this->assertSame($expectedMessage, $context->getPsrMessage());
$this->assertInstanceOf(NullLogger::class, $context->getLogger());
$this->assertNull($context->getException());
$this->assertSame(Result::ACK, $context->getResult());
$this->assertFalse($context->isExecutionInterrupted());
})
;

$chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]);
$queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0);
$queueConsumer->bind(new NullQueue('aQueueName'), $processorMock);

$queueConsumer->consume();
}

public function testShouldCallOnPostReceivedExtensionMethodWithExpectedContext()
{
$expectedMessage = $this->createMessageMock();
$messageConsumerStub = $this->createMessageConsumerStub($expectedMessage);

$contextStub = $this->createPsrContextStub($messageConsumerStub);

$processorMock = $this->createProcessorStub();

$extension = $this->createExtension();
$extension
->expects($this->once())
->method('onPostReceived')
Expand Down Expand Up @@ -722,6 +778,57 @@ public function testShouldAllowInterruptConsumingOnPreReceiveButProcessCurrentMe
$queueConsumer->consume();
}

public function testShouldAllowInterruptConsumingOnResult()
{
$expectedMessage = $this->createMessageMock();
$messageConsumerStub = $this->createMessageConsumerStub($expectedMessage);

$contextStub = $this->createPsrContextStub($messageConsumerStub);

$processorMock = $this->createProcessorMock();
$processorMock
->expects($this->once())
->method('process')
->willReturn(Result::ACK)
;

$extension = $this->createExtension();
$extension
->expects($this->once())
->method('onResult')
->with($this->isInstanceOf(Context::class))
->willReturnCallback(function (Context $context) {
$context->setExecutionInterrupted(true);
})
;
$extension
->expects($this->atLeastOnce())
->method('onInterrupted')
->with($this->isInstanceOf(Context::class))
->willReturnCallback(function (Context $context) use (
$contextStub,
$messageConsumerStub,
$processorMock,
$expectedMessage
) {
$this->assertSame($contextStub, $context->getPsrContext());
$this->assertSame($messageConsumerStub, $context->getPsrConsumer());
$this->assertSame($processorMock, $context->getPsrProcessor());
$this->assertSame($expectedMessage, $context->getPsrMessage());
$this->assertInstanceOf(NullLogger::class, $context->getLogger());
$this->assertNull($context->getException());
$this->assertSame(Result::ACK, $context->getResult());
$this->assertTrue($context->isExecutionInterrupted());
})
;

$chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]);
$queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0);
$queueConsumer->bind(new NullQueue('aQueueName'), $processorMock);

$queueConsumer->consume();
}

public function testShouldAllowInterruptConsumingOnPostReceive()
{
$expectedMessage = $this->createMessageMock();
Expand Down Expand Up @@ -850,6 +957,11 @@ public function testShouldCallExtensionPassedOnRuntime()
->method('onPreReceived')
->with($this->isInstanceOf(Context::class))
;
$runtimeExtension
->expects($this->once())
->method('onResult')
->with($this->isInstanceOf(Context::class))
;
$runtimeExtension
->expects($this->once())
->method('onPostReceived')
Expand Down Expand Up @@ -936,7 +1048,7 @@ public function testShouldCallEachQueueOneByOne()
})
;
$extension
->expects($this->at(4))
->expects($this->at(5))
->method('onBeforeReceive')
->with($this->isInstanceOf(Context::class))
->willReturnCallback(function (Context $context) use ($anotherProcessorMock, $queue2) {
Expand Down