Skip to content

Add an extension to stop consumption on closed entity manager #932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ enqueue:
doctrine_ping_connection_extension: false
doctrine_clear_identity_map_extension: false
doctrine_odm_clear_identity_map_extension: false
doctrine_closed_entity_manager_extension: false
reset_services_extension: false
signal_extension: true
reply_extension: true
Expand Down
4 changes: 4 additions & 0 deletions docs/consumption/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ It clears Doctrine's identity map after a message is processed. It reduce memory

It test a database connection and if it is lost it does reconnect. Fixes "MySQL has gone away" errors.

## [DoctrineClosedEntityManagerExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Consumption/Extension/DoctrineClosedEntityManagerExtension.php)

The extension interrupts consumption if an entity manager has been closed.

## [ResetServicesExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Consumption/Extension/ResetServicesExtension.php)

It resets all services with tag "kernel.reset".
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

namespace Enqueue\Bundle\Consumption\Extension;

use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\ORM\EntityManagerInterface;
use Enqueue\Consumption\Context\PostConsume;
use Enqueue\Consumption\Context\PostMessageReceived;
use Enqueue\Consumption\Context\PreConsume;
use Enqueue\Consumption\PostConsumeExtensionInterface;
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
use Enqueue\Consumption\PreConsumeExtensionInterface;
use Psr\Log\LoggerInterface;

class DoctrineClosedEntityManagerExtension implements PreConsumeExtensionInterface, PostMessageReceivedExtensionInterface, PostConsumeExtensionInterface
{
/**
* @var ManagerRegistry
*/
protected $registry;

public function __construct(ManagerRegistry $registry)
{
$this->registry = $registry;
}

public function onPreConsume(PreConsume $context): void
{
if ($this->shouldBeStopped($context->getLogger())) {
$context->interruptExecution();
}
}

public function onPostConsume(PostConsume $context): void
{
if ($this->shouldBeStopped($context->getLogger())) {
$context->interruptExecution();
}
}

public function onPostMessageReceived(PostMessageReceived $context): void
{
if ($this->shouldBeStopped($context->getLogger())) {
$context->interruptExecution();
}
}

private function shouldBeStopped(LoggerInterface $logger): bool
{
foreach ($this->registry->getManagers() as $name => $manager) {
if (!$manager instanceof EntityManagerInterface || $manager->isOpen()) {
continue;
}

$logger->debug(sprintf(
'[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "%s" has been closed',
$name
));

return true;
}

return false;
}
}
1 change: 1 addition & 0 deletions pkg/enqueue-bundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public function getConfigTreeBuilder(): TreeBuilder
->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()
->booleanNode('doctrine_odm_clear_identity_map_extension')->defaultFalse()->end()
->booleanNode('doctrine_closed_entity_manager_extension')->defaultFalse()->end()
->booleanNode('reset_services_extension')->defaultFalse()->end()
->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end()
->booleanNode('reply_extension')->defaultTrue()->end()
Expand Down
24 changes: 24 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Enqueue\AsyncCommand\DependencyInjection\AsyncCommandExtension;
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventDispatcherExtension;
use Enqueue\Bundle\Consumption\Extension\DoctrineClearIdentityMapExtension;
use Enqueue\Bundle\Consumption\Extension\DoctrineClosedEntityManagerExtension;
use Enqueue\Bundle\Consumption\Extension\DoctrinePingConnectionExtension;
use Enqueue\Bundle\Consumption\Extension\ResetServicesExtension;
use Enqueue\Bundle\Profiler\MessageQueueCollector;
Expand Down Expand Up @@ -138,6 +139,7 @@ public function load(array $configs, ContainerBuilder $container): void
$this->loadDoctrinePingConnectionExtension($config, $container);
$this->loadDoctrineClearIdentityMapExtension($config, $container);
$this->loadDoctrineOdmClearIdentityMapExtension($config, $container);
$this->loadDoctrineClosedEntityManagerExtension($config, $container);
$this->loadResetServicesExtension($config, $container);
$this->loadSignalExtension($config, $container);
$this->loadReplyExtension($config, $container);
Expand Down Expand Up @@ -273,6 +275,28 @@ private function loadDoctrineOdmClearIdentityMapExtension(array $config, Contain
}
}

private function loadDoctrineClosedEntityManagerExtension(array $config, ContainerBuilder $container)
{
$configNames = [];
foreach ($config as $name => $modules) {
if ($modules['extensions']['doctrine_closed_entity_manager_extension']) {
$configNames[] = $name;
}
}

if ([] === $configNames) {
return;
}

$extension = $container->register('enqueue.consumption.doctrine_closed_entity_manager_extension', DoctrineClosedEntityManagerExtension::class)
->addArgument(new Reference('doctrine'));

foreach ($configNames as $name) {
$extension->addTag('enqueue.consumption_extension', ['client' => $name]);
$extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]);
}
}

private function loadResetServicesExtension(array $config, ContainerBuilder $container)
{
$configNames = [];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
<?php

namespace Enqueue\Bundle\Tests\Unit\Consumption\Extension;

use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\ORM\EntityManagerInterface;
use Enqueue\Bundle\Consumption\Extension\DoctrineClosedEntityManagerExtension;
use Enqueue\Consumption\Context\PostConsume;
use Enqueue\Consumption\Context\PostMessageReceived;
use Enqueue\Consumption\Context\PreConsume;
use Interop\Queue\Consumer;
use Interop\Queue\Context as InteropContext;
use Interop\Queue\Message;
use Interop\Queue\SubscriptionConsumer;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;

class DoctrineClosedEntityManagerExtensionTest extends TestCase
{
public function testOnPreConsumeShouldNotInterruptExecution()
{
$manager = $this->createManagerMock(true);

$registry = $this->createRegistryMock([
'manager' => $manager,
]);

$message = new PreConsume(
$this->createMock(InteropContext::class),
$this->createMock(SubscriptionConsumer::class),
$this->createMock(LoggerInterface::class),
1,
2,
3
);

self::assertFalse($message->isExecutionInterrupted());

$extension = new DoctrineClosedEntityManagerExtension($registry);
$extension->onPreConsume($message);

self::assertFalse($message->isExecutionInterrupted());
}

public function testOnPreConsumeShouldInterruptExecutionIfAManagerIsClosed()
{
$manager1 = $this->createManagerMock(true);
$manager2 = $this->createManagerMock(false);

$registry = $this->createRegistryMock([
'manager1' => $manager1,
'manager2' => $manager2,
]);

$message = new PreConsume(
$this->createMock(InteropContext::class),
$this->createMock(SubscriptionConsumer::class),
$this->createMock(LoggerInterface::class),
1,
2,
3
);
$message->getLogger()
->expects($this->once())
->method('debug')
->with('[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "manager2" has been closed')
;

self::assertFalse($message->isExecutionInterrupted());

$extension = new DoctrineClosedEntityManagerExtension($registry);
$extension->onPreConsume($message);

self::assertTrue($message->isExecutionInterrupted());
}

public function testOnPostConsumeShouldNotInterruptExecution()
{
$manager = $this->createManagerMock(true);

$registry = $this->createRegistryMock([
'manager' => $manager,
]);

$message = new PostConsume(
$this->createMock(InteropContext::class),
$this->createMock(SubscriptionConsumer::class),
1,
1,
1,
$this->createMock(LoggerInterface::class)
);

self::assertFalse($message->isExecutionInterrupted());

$extension = new DoctrineClosedEntityManagerExtension($registry);
$extension->onPostConsume($message);

self::assertFalse($message->isExecutionInterrupted());
}

public function testOnPostConsumeShouldInterruptExecutionIfAManagerIsClosed()
{
$manager1 = $this->createManagerMock(true);
$manager2 = $this->createManagerMock(false);

$registry = $this->createRegistryMock([
'manager1' => $manager1,
'manager2' => $manager2,
]);

$message = new PostConsume(
$this->createMock(InteropContext::class),
$this->createMock(SubscriptionConsumer::class),
1,
1,
1,
$this->createMock(LoggerInterface::class)
);
$message->getLogger()
->expects($this->once())
->method('debug')
->with('[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "manager2" has been closed')
;

self::assertFalse($message->isExecutionInterrupted());

$extension = new DoctrineClosedEntityManagerExtension($registry);
$extension->onPostConsume($message);

self::assertTrue($message->isExecutionInterrupted());
}

public function testOnPostReceivedShouldNotInterruptExecution()
{
$manager = $this->createManagerMock(true);

$registry = $this->createRegistryMock([
'manager' => $manager,
]);

$message = new PostMessageReceived(
$this->createMock(InteropContext::class),
$this->createMock(Consumer::class),
$this->createMock(Message::class),
'aResult',
1,
$this->createMock(LoggerInterface::class)
);

self::assertFalse($message->isExecutionInterrupted());

$extension = new DoctrineClosedEntityManagerExtension($registry);
$extension->onPostMessageReceived($message);

self::assertFalse($message->isExecutionInterrupted());
}

public function testOnPostReceivedShouldInterruptExecutionIfAManagerIsClosed()
{
$manager1 = $this->createManagerMock(true);
$manager2 = $this->createManagerMock(false);

$registry = $this->createRegistryMock([
'manager1' => $manager1,
'manager2' => $manager2,
]);

$message = new PostMessageReceived(
$this->createMock(InteropContext::class),
$this->createMock(Consumer::class),
$this->createMock(Message::class),
'aResult',
1,
$this->createMock(LoggerInterface::class)
);
$message->getLogger()
->expects($this->once())
->method('debug')
->with('[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "manager2" has been closed')
;

self::assertFalse($message->isExecutionInterrupted());

$extension = new DoctrineClosedEntityManagerExtension($registry);
$extension->onPostMessageReceived($message);

self::assertTrue($message->isExecutionInterrupted());
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|ManagerRegistry
*/
protected function createRegistryMock(array $managers): ManagerRegistry
{
$mock = $this->createMock(ManagerRegistry::class);

$mock
->expects($this->once())
->method('getManagers')
->willReturn($managers)
;

return $mock;
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|EntityManagerInterface
*/
protected function createManagerMock(bool $open): EntityManagerInterface
{
$mock = $this->createMock(EntityManagerInterface::class);

$mock
->expects($this->once())
->method('isOpen')
->willReturn($open)
;

return $mock;
}
}
Loading