From 05951433953b858140902bdfd11cc2a0552b64dc Mon Sep 17 00:00:00 2001 From: Lctrs Date: Thu, 8 Aug 2019 16:07:03 +0200 Subject: [PATCH] Add an extension to stop consumption on closed entity manager --- docs/bundle/config_reference.md | 1 + docs/consumption/extensions.md | 4 + .../DoctrineClosedEntityManagerExtension.php | 65 +++++ .../DependencyInjection/Configuration.php | 1 + .../DependencyInjection/EnqueueExtension.php | 24 ++ ...ctrineClosedEntityManagerExtensionTest.php | 222 ++++++++++++++++++ .../DependencyInjection/ConfigurationTest.php | 43 ++++ .../EnqueueExtensionTest.php | 36 +++ 8 files changed, 396 insertions(+) create mode 100644 pkg/enqueue-bundle/Consumption/Extension/DoctrineClosedEntityManagerExtension.php create mode 100644 pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClosedEntityManagerExtensionTest.php diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md index a549ceda1..ce115336e 100644 --- a/docs/bundle/config_reference.md +++ b/docs/bundle/config_reference.md @@ -70,6 +70,7 @@ enqueue: doctrine_ping_connection_extension: false doctrine_clear_identity_map_extension: false doctrine_odm_clear_identity_map_extension: false + doctrine_closed_entity_manager_extension: false reset_services_extension: false signal_extension: true reply_extension: true diff --git a/docs/consumption/extensions.md b/docs/consumption/extensions.md index 7cddb2e3d..8afd61e8b 100644 --- a/docs/consumption/extensions.md +++ b/docs/consumption/extensions.md @@ -22,6 +22,10 @@ It clears Doctrine's identity map after a message is processed. It reduce memory It test a database connection and if it is lost it does reconnect. Fixes "MySQL has gone away" errors. +## [DoctrineClosedEntityManagerExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Consumption/Extension/DoctrineClosedEntityManagerExtension.php) + +The extension interrupts consumption if an entity manager has been closed. + ## [ResetServicesExtension](https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/enqueue-bundle/Consumption/Extension/ResetServicesExtension.php) It resets all services with tag "kernel.reset". diff --git a/pkg/enqueue-bundle/Consumption/Extension/DoctrineClosedEntityManagerExtension.php b/pkg/enqueue-bundle/Consumption/Extension/DoctrineClosedEntityManagerExtension.php new file mode 100644 index 000000000..d2765231a --- /dev/null +++ b/pkg/enqueue-bundle/Consumption/Extension/DoctrineClosedEntityManagerExtension.php @@ -0,0 +1,65 @@ +registry = $registry; + } + + public function onPreConsume(PreConsume $context): void + { + if ($this->shouldBeStopped($context->getLogger())) { + $context->interruptExecution(); + } + } + + public function onPostConsume(PostConsume $context): void + { + if ($this->shouldBeStopped($context->getLogger())) { + $context->interruptExecution(); + } + } + + public function onPostMessageReceived(PostMessageReceived $context): void + { + if ($this->shouldBeStopped($context->getLogger())) { + $context->interruptExecution(); + } + } + + private function shouldBeStopped(LoggerInterface $logger): bool + { + foreach ($this->registry->getManagers() as $name => $manager) { + if (!$manager instanceof EntityManagerInterface || $manager->isOpen()) { + continue; + } + + $logger->debug(sprintf( + '[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "%s" has been closed', + $name + )); + + return true; + } + + return false; + } +} diff --git a/pkg/enqueue-bundle/DependencyInjection/Configuration.php b/pkg/enqueue-bundle/DependencyInjection/Configuration.php index 3b26f5ad4..3fe3ec76d 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Configuration.php +++ b/pkg/enqueue-bundle/DependencyInjection/Configuration.php @@ -48,6 +48,7 @@ public function getConfigTreeBuilder(): TreeBuilder ->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end() ->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end() ->booleanNode('doctrine_odm_clear_identity_map_extension')->defaultFalse()->end() + ->booleanNode('doctrine_closed_entity_manager_extension')->defaultFalse()->end() ->booleanNode('reset_services_extension')->defaultFalse()->end() ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() ->booleanNode('reply_extension')->defaultTrue()->end() diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index 620e71925..fd6ccae86 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -5,6 +5,7 @@ use Enqueue\AsyncCommand\DependencyInjection\AsyncCommandExtension; use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventDispatcherExtension; use Enqueue\Bundle\Consumption\Extension\DoctrineClearIdentityMapExtension; +use Enqueue\Bundle\Consumption\Extension\DoctrineClosedEntityManagerExtension; use Enqueue\Bundle\Consumption\Extension\DoctrinePingConnectionExtension; use Enqueue\Bundle\Consumption\Extension\ResetServicesExtension; use Enqueue\Bundle\Profiler\MessageQueueCollector; @@ -138,6 +139,7 @@ public function load(array $configs, ContainerBuilder $container): void $this->loadDoctrinePingConnectionExtension($config, $container); $this->loadDoctrineClearIdentityMapExtension($config, $container); $this->loadDoctrineOdmClearIdentityMapExtension($config, $container); + $this->loadDoctrineClosedEntityManagerExtension($config, $container); $this->loadResetServicesExtension($config, $container); $this->loadSignalExtension($config, $container); $this->loadReplyExtension($config, $container); @@ -273,6 +275,28 @@ private function loadDoctrineOdmClearIdentityMapExtension(array $config, Contain } } + private function loadDoctrineClosedEntityManagerExtension(array $config, ContainerBuilder $container) + { + $configNames = []; + foreach ($config as $name => $modules) { + if ($modules['extensions']['doctrine_closed_entity_manager_extension']) { + $configNames[] = $name; + } + } + + if ([] === $configNames) { + return; + } + + $extension = $container->register('enqueue.consumption.doctrine_closed_entity_manager_extension', DoctrineClosedEntityManagerExtension::class) + ->addArgument(new Reference('doctrine')); + + foreach ($configNames as $name) { + $extension->addTag('enqueue.consumption_extension', ['client' => $name]); + $extension->addTag('enqueue.transport.consumption_extension', ['transport' => $name]); + } + } + private function loadResetServicesExtension(array $config, ContainerBuilder $container) { $configNames = []; diff --git a/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClosedEntityManagerExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClosedEntityManagerExtensionTest.php new file mode 100644 index 000000000..1cccb0b13 --- /dev/null +++ b/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClosedEntityManagerExtensionTest.php @@ -0,0 +1,222 @@ +createManagerMock(true); + + $registry = $this->createRegistryMock([ + 'manager' => $manager, + ]); + + $message = new PreConsume( + $this->createMock(InteropContext::class), + $this->createMock(SubscriptionConsumer::class), + $this->createMock(LoggerInterface::class), + 1, + 2, + 3 + ); + + self::assertFalse($message->isExecutionInterrupted()); + + $extension = new DoctrineClosedEntityManagerExtension($registry); + $extension->onPreConsume($message); + + self::assertFalse($message->isExecutionInterrupted()); + } + + public function testOnPreConsumeShouldInterruptExecutionIfAManagerIsClosed() + { + $manager1 = $this->createManagerMock(true); + $manager2 = $this->createManagerMock(false); + + $registry = $this->createRegistryMock([ + 'manager1' => $manager1, + 'manager2' => $manager2, + ]); + + $message = new PreConsume( + $this->createMock(InteropContext::class), + $this->createMock(SubscriptionConsumer::class), + $this->createMock(LoggerInterface::class), + 1, + 2, + 3 + ); + $message->getLogger() + ->expects($this->once()) + ->method('debug') + ->with('[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "manager2" has been closed') + ; + + self::assertFalse($message->isExecutionInterrupted()); + + $extension = new DoctrineClosedEntityManagerExtension($registry); + $extension->onPreConsume($message); + + self::assertTrue($message->isExecutionInterrupted()); + } + + public function testOnPostConsumeShouldNotInterruptExecution() + { + $manager = $this->createManagerMock(true); + + $registry = $this->createRegistryMock([ + 'manager' => $manager, + ]); + + $message = new PostConsume( + $this->createMock(InteropContext::class), + $this->createMock(SubscriptionConsumer::class), + 1, + 1, + 1, + $this->createMock(LoggerInterface::class) + ); + + self::assertFalse($message->isExecutionInterrupted()); + + $extension = new DoctrineClosedEntityManagerExtension($registry); + $extension->onPostConsume($message); + + self::assertFalse($message->isExecutionInterrupted()); + } + + public function testOnPostConsumeShouldInterruptExecutionIfAManagerIsClosed() + { + $manager1 = $this->createManagerMock(true); + $manager2 = $this->createManagerMock(false); + + $registry = $this->createRegistryMock([ + 'manager1' => $manager1, + 'manager2' => $manager2, + ]); + + $message = new PostConsume( + $this->createMock(InteropContext::class), + $this->createMock(SubscriptionConsumer::class), + 1, + 1, + 1, + $this->createMock(LoggerInterface::class) + ); + $message->getLogger() + ->expects($this->once()) + ->method('debug') + ->with('[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "manager2" has been closed') + ; + + self::assertFalse($message->isExecutionInterrupted()); + + $extension = new DoctrineClosedEntityManagerExtension($registry); + $extension->onPostConsume($message); + + self::assertTrue($message->isExecutionInterrupted()); + } + + public function testOnPostReceivedShouldNotInterruptExecution() + { + $manager = $this->createManagerMock(true); + + $registry = $this->createRegistryMock([ + 'manager' => $manager, + ]); + + $message = new PostMessageReceived( + $this->createMock(InteropContext::class), + $this->createMock(Consumer::class), + $this->createMock(Message::class), + 'aResult', + 1, + $this->createMock(LoggerInterface::class) + ); + + self::assertFalse($message->isExecutionInterrupted()); + + $extension = new DoctrineClosedEntityManagerExtension($registry); + $extension->onPostMessageReceived($message); + + self::assertFalse($message->isExecutionInterrupted()); + } + + public function testOnPostReceivedShouldInterruptExecutionIfAManagerIsClosed() + { + $manager1 = $this->createManagerMock(true); + $manager2 = $this->createManagerMock(false); + + $registry = $this->createRegistryMock([ + 'manager1' => $manager1, + 'manager2' => $manager2, + ]); + + $message = new PostMessageReceived( + $this->createMock(InteropContext::class), + $this->createMock(Consumer::class), + $this->createMock(Message::class), + 'aResult', + 1, + $this->createMock(LoggerInterface::class) + ); + $message->getLogger() + ->expects($this->once()) + ->method('debug') + ->with('[DoctrineClosedEntityManagerExtension] Interrupt execution as entity manager "manager2" has been closed') + ; + + self::assertFalse($message->isExecutionInterrupted()); + + $extension = new DoctrineClosedEntityManagerExtension($registry); + $extension->onPostMessageReceived($message); + + self::assertTrue($message->isExecutionInterrupted()); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|ManagerRegistry + */ + protected function createRegistryMock(array $managers): ManagerRegistry + { + $mock = $this->createMock(ManagerRegistry::class); + + $mock + ->expects($this->once()) + ->method('getManagers') + ->willReturn($managers) + ; + + return $mock; + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|EntityManagerInterface + */ + protected function createManagerMock(bool $open): EntityManagerInterface + { + $mock = $this->createMock(EntityManagerInterface::class); + + $mock + ->expects($this->once()) + ->method('isOpen') + ->willReturn($open) + ; + + return $mock; + } +} diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php index 6e2b47dcf..3f3aae8ee 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php @@ -379,6 +379,49 @@ public function testDoctrineOdmClearIdentityMapExtensionCouldBeEnabled() ], $config); } + public function testDoctrineClosedEntityManagerExtensionShouldBeDisabledByDefault() + { + $configuration = new Configuration(true); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, [[ + 'default' => [ + 'transport' => null, + ], + ]]); + + $this->assertArraySubset([ + 'default' => [ + 'extensions' => [ + 'doctrine_closed_entity_manager_extension' => false, + ], + ], + ], $config); + } + + public function testDoctrineClosedEntityManagerExtensionCouldBeEnabled() + { + $configuration = new Configuration(true); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, [[ + 'default' => [ + 'transport' => null, + 'extensions' => [ + 'doctrine_closed_entity_manager_extension' => true, + ], + ], + ]]); + + $this->assertArraySubset([ + 'default' => [ + 'extensions' => [ + 'doctrine_closed_entity_manager_extension' => true, + ], + ], + ], $config); + } + public function testResetServicesExtensionShouldBeDisabledByDefault() { $configuration = new Configuration(true); diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index ab1eb2967..51bbaa26d 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -420,6 +420,42 @@ public function testShouldNotLoadDoctrineOdmClearIdentityMapExtensionServiceIfDi self::assertFalse($container->hasDefinition('enqueue.consumption.doctrine_odm_clear_identity_map_extension')); } + public function testShouldLoadDoctrineClosedEntityManagerExtensionServiceIfEnabled() + { + $container = $this->getContainerBuilder(true); + + $extension = new EnqueueExtension(); + + $extension->load([[ + 'default' => [ + 'transport' => [], + 'extensions' => [ + 'doctrine_closed_entity_manager_extension' => true, + ], + ], + ]], $container); + + self::assertTrue($container->hasDefinition('enqueue.consumption.doctrine_closed_entity_manager_extension')); + } + + public function testShouldNotLoadDoctrineClosedEntityManagerExtensionServiceIfDisabled() + { + $container = $this->getContainerBuilder(true); + + $extension = new EnqueueExtension(); + + $extension->load([[ + 'default' => [ + 'transport' => [], + 'extensions' => [ + 'doctrine_closed_entity_manager_extension' => false, + ], + ], + ]], $container); + + self::assertFalse($container->hasDefinition('enqueue.consumption.doctrine_closed_entity_manager_extension')); + } + public function testShouldLoadResetServicesExtensionServiceIfEnabled() { $container = $this->getContainerBuilder(true);