From 5d263120307e4cde09105ead3723fe497dc0ad21 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 5 May 2017 10:05:55 +0300 Subject: [PATCH 1/4] client extension --- pkg/enqueue/Client/ChainExtension.php | 39 +++++++++++++++++++++++ pkg/enqueue/Client/ExtensionInterface.php | 21 ++++++++++++ pkg/enqueue/Client/Producer.php | 12 ++++++- 3 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 pkg/enqueue/Client/ChainExtension.php create mode 100644 pkg/enqueue/Client/ExtensionInterface.php diff --git a/pkg/enqueue/Client/ChainExtension.php b/pkg/enqueue/Client/ChainExtension.php new file mode 100644 index 000000000..c202e98e0 --- /dev/null +++ b/pkg/enqueue/Client/ChainExtension.php @@ -0,0 +1,39 @@ +extensions = $extensions; + } + + /** + * {@inheritdoc} + */ + public function onPreSend($topic, Message $message) + { + foreach ($this->extensions as $extension) { + $extension->onPreSend($topic, $message); + } + } + + /** + * {@inheritdoc} + */ + public function onPostSend($topic, Message $message) + { + foreach ($this->extensions as $extension) { + $extension->onPostSend($topic, $message); + } + } +} diff --git a/pkg/enqueue/Client/ExtensionInterface.php b/pkg/enqueue/Client/ExtensionInterface.php new file mode 100644 index 000000000..47cdfd311 --- /dev/null +++ b/pkg/enqueue/Client/ExtensionInterface.php @@ -0,0 +1,21 @@ +driver = $driver; + $this->extension = $extension ?: new ChainExtension([]); } /** @@ -47,6 +53,8 @@ public function send($topic, $message) $message->setPriority(MessagePriority::NORMAL); } + $this->extension->onPreSend($topic, $message); + if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) { if ($message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_QUEUE_NAME)); @@ -68,6 +76,8 @@ public function send($topic, $message) } else { throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope())); } + + $this->extension->onPostSend($topic, $message); } /** From 2145d1a594b8e07a947cbd8d2e646d00f742f8c3 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 5 May 2017 11:01:18 +0300 Subject: [PATCH 2/4] client extension --- pkg/amqp-ext/Client/AmqpDriver.php | 9 +++++++++ pkg/enqueue/Client/ChainExtension.php | 13 +++++++++++++ .../ConsumptionExtension/SetupBrokerExtension.php | 11 ++++++++++- pkg/enqueue/Client/DriverInterface.php | 6 ++++++ pkg/enqueue/Client/ExtensionInterface.php | 10 +++++++++- pkg/enqueue/Symfony/Client/SetupBrokerCommand.php | 13 +++++++++++-- 6 files changed, 58 insertions(+), 4 deletions(-) diff --git a/pkg/amqp-ext/Client/AmqpDriver.php b/pkg/amqp-ext/Client/AmqpDriver.php index ec11765de..b965b6a5d 100644 --- a/pkg/amqp-ext/Client/AmqpDriver.php +++ b/pkg/amqp-ext/Client/AmqpDriver.php @@ -11,6 +11,7 @@ use Enqueue\Client\Message; use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\AmqpExt\DeliveryMode; +use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -192,6 +193,14 @@ public function getConfig() return $this->config; } + /** + * {@inheritdoc} + */ + public function getContext() + { + return $this->context; + } + /** * @return AmqpTopic */ diff --git a/pkg/enqueue/Client/ChainExtension.php b/pkg/enqueue/Client/ChainExtension.php index c202e98e0..2a6d0c3d6 100644 --- a/pkg/enqueue/Client/ChainExtension.php +++ b/pkg/enqueue/Client/ChainExtension.php @@ -2,6 +2,9 @@ namespace Enqueue\Client; +use Enqueue\Psr\PsrContext; +use Psr\Log\LoggerInterface; + class ChainExtension implements ExtensionInterface { /** @@ -36,4 +39,14 @@ public function onPostSend($topic, Message $message) $extension->onPostSend($topic, $message); } } + + /** + * {@inheritdoc} + */ + public function onPostSetupBroker(PsrContext $context, LoggerInterface $logger = null) + { + foreach ($this->extensions as $extension) { + $extension->onPostSetupBroker($context, $logger); + } + } } diff --git a/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php b/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php index 8b6aecbc1..2131a6fdc 100644 --- a/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php +++ b/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php @@ -2,6 +2,8 @@ namespace Enqueue\Client\ConsumptionExtension; +use Enqueue\Client\ChainExtension; +use Enqueue\Client\ExtensionInterface as ClientExtensionInterface; use Enqueue\Client\DriverInterface; use Enqueue\Consumption\Context; use Enqueue\Consumption\EmptyExtensionTrait; @@ -16,6 +18,11 @@ class SetupBrokerExtension implements ExtensionInterface */ private $driver; + /** + * @var ClientExtensionInterface + */ + private $extension; + /** * @var bool */ @@ -24,9 +31,10 @@ class SetupBrokerExtension implements ExtensionInterface /** * @param DriverInterface $driver */ - public function __construct(DriverInterface $driver) + public function __construct(DriverInterface $driver, ClientExtensionInterface $extension = null) { $this->driver = $driver; + $this->extension = $extension ?: new ChainExtension([]); $this->isDone = false; } @@ -38,6 +46,7 @@ public function onStart(Context $context) if (false == $this->isDone) { $this->isDone = true; $this->driver->setupBroker($context->getLogger()); + $this->extension->onPostSetupBroker($this->driver->getContext(), $context->getLogger()); } } } diff --git a/pkg/enqueue/Client/DriverInterface.php b/pkg/enqueue/Client/DriverInterface.php index 3737165e3..f3b85b0dc 100644 --- a/pkg/enqueue/Client/DriverInterface.php +++ b/pkg/enqueue/Client/DriverInterface.php @@ -2,6 +2,7 @@ namespace Enqueue\Client; +use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Enqueue\Psr\PsrQueue; use Psr\Log\LoggerInterface; @@ -50,4 +51,9 @@ public function setupBroker(LoggerInterface $logger = null); * @return Config */ public function getConfig(); + + /** + * @return PsrContext + */ + public function getContext(); } diff --git a/pkg/enqueue/Client/ExtensionInterface.php b/pkg/enqueue/Client/ExtensionInterface.php index 47cdfd311..1d1e73333 100644 --- a/pkg/enqueue/Client/ExtensionInterface.php +++ b/pkg/enqueue/Client/ExtensionInterface.php @@ -1,6 +1,9 @@ driver = $driver; + $this->extension = $extension ?: new ChainExtension([]); } /** @@ -44,6 +52,7 @@ protected function execute(InputInterface $input, OutputInterface $output) { $output->writeln('Setup Broker'); - $this->driver->setupBroker(new ConsoleLogger($output)); + $this->driver->setupBroker($logger = new ConsoleLogger($output)); + $this->extension->onPostSetupBroker($this->driver->getContext(), $logger); } } From d771b3ff1f8778b2225624a82e546cf8f60ffceb Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 5 May 2017 13:32:32 +0300 Subject: [PATCH 3/4] client extension --- pkg/amqp-ext/Client/AmqpDriver.php | 9 -- .../Compiler/BuildClientExtensionsPass.php | 40 ++++++ ...php => BuildConsumptionExtensionsPass.php} | 2 +- pkg/enqueue-bundle/EnqueueBundle.php | 6 +- .../Resources/config/client.yml | 10 +- .../BuildClientExtensionsPassTest.php | 129 ++++++++++++++++++ ...=> BuildConsumptionExtensionsPassTest.php} | 14 +- .../Tests/Unit/EnqueueBundleTest.php | 10 +- pkg/enqueue/Client/ChainExtension.php | 13 -- .../SetupBrokerExtension.php | 11 +- pkg/enqueue/Client/DriverInterface.php | 6 - pkg/enqueue/Client/ExtensionInterface.php | 10 -- pkg/enqueue/Client/Producer.php | 8 +- .../Symfony/Client/SetupBrokerCommand.php | 11 +- pkg/enqueue/Tests/Client/ProducerTest.php | 57 ++++++++ 15 files changed, 261 insertions(+), 75 deletions(-) create mode 100644 pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php rename pkg/enqueue-bundle/DependencyInjection/Compiler/{BuildExtensionsPass.php => BuildConsumptionExtensionsPass.php} (94%) create mode 100644 pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php rename pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/{BuildExtensionsPassTest.php => BuildConsumptionExtensionsPassTest.php} (90%) diff --git a/pkg/amqp-ext/Client/AmqpDriver.php b/pkg/amqp-ext/Client/AmqpDriver.php index b965b6a5d..ec11765de 100644 --- a/pkg/amqp-ext/Client/AmqpDriver.php +++ b/pkg/amqp-ext/Client/AmqpDriver.php @@ -11,7 +11,6 @@ use Enqueue\Client\Message; use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\AmqpExt\DeliveryMode; -use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -193,14 +192,6 @@ public function getConfig() return $this->config; } - /** - * {@inheritdoc} - */ - public function getContext() - { - return $this->context; - } - /** * @return AmqpTopic */ diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php new file mode 100644 index 000000000..5f83e83e2 --- /dev/null +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php @@ -0,0 +1,40 @@ +hasDefinition('enqueue.client.extensions')) { + return; + } + + $tags = $container->findTaggedServiceIds('enqueue.client.extension'); + + $groupByPriority = []; + foreach ($tags as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $priority = isset($tagAttribute['priority']) ? (int) $tagAttribute['priority'] : 0; + + $groupByPriority[$priority][] = new Reference($serviceId); + } + } + + krsort($groupByPriority, SORT_NUMERIC); + + $flatExtensions = []; + foreach ($groupByPriority as $extension) { + $flatExtensions = array_merge($flatExtensions, $extension); + } + + $container->getDefinition('enqueue.client.extensions')->replaceArgument(0, $flatExtensions); + } +} diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php similarity index 94% rename from pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php rename to pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php index 31dcee799..20f2a3817 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php @@ -6,7 +6,7 @@ use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Reference; -class BuildExtensionsPass implements CompilerPassInterface +class BuildConsumptionExtensionsPass implements CompilerPassInterface { /** * {@inheritdoc} diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index 4c8e5806d..5b4039104 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -5,8 +5,9 @@ use Enqueue\AmqpExt\AmqpContext; use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; @@ -34,11 +35,12 @@ class EnqueueBundle extends Bundle */ public function build(ContainerBuilder $container) { - $container->addCompilerPass(new BuildExtensionsPass()); + $container->addCompilerPass(new BuildConsumptionExtensionsPass()); $container->addCompilerPass(new BuildClientRoutingPass()); $container->addCompilerPass(new BuildProcessorRegistryPass()); $container->addCompilerPass(new BuildTopicMetaSubscribersPass()); $container->addCompilerPass(new BuildQueueMetaRegistryPass()); + $container->addCompilerPass(new BuildClientExtensionsPass()); /** @var EnqueueExtension $extension */ $extension = $container->getExtension('enqueue'); diff --git a/pkg/enqueue-bundle/Resources/config/client.yml b/pkg/enqueue-bundle/Resources/config/client.yml index 0c1dab387..c83db9055 100644 --- a/pkg/enqueue-bundle/Resources/config/client.yml +++ b/pkg/enqueue-bundle/Resources/config/client.yml @@ -5,7 +5,15 @@ services: enqueue.client.producer: class: 'Enqueue\Client\Producer' - arguments: ['@enqueue.client.driver'] + arguments: + - '@enqueue.client.driver' + - '@enqueue.client.extensions' + + enqueue.client.extensions: + class: 'Enqueue\Client\ChainExtension' + public: false + arguments: + - [] enqueue.producer: alias: 'enqueue.client.producer' diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php new file mode 100644 index 000000000..5b98ecda6 --- /dev/null +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php @@ -0,0 +1,129 @@ +assertClassImplements(CompilerPassInterface::class, BuildClientExtensionsPass::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new BuildClientExtensionsPass(); + } + + public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWithTaggsExtensions() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('bar_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $this->assertEquals( + [new Reference('foo_extension'), new Reference('bar_extension')], + $extensions->getArgument(0) + ); + } + + public function testShouldOrderExtensionsByPriority() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 6]); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => -5]); + $container->setDefinition('bar_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 2]); + $container->setDefinition('baz_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $orderedExtensions = $extensions->getArgument(0); + + $this->assertEquals(new Reference('foo_extension'), $orderedExtensions[0]); + $this->assertEquals(new Reference('baz_extension'), $orderedExtensions[1]); + $this->assertEquals(new Reference('bar_extension'), $orderedExtensions[2]); + } + + public function testShouldAssumePriorityZeroIfPriorityIsNotSet() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 1]); + $container->setDefinition('bar_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => -1]); + $container->setDefinition('baz_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $orderedExtensions = $extensions->getArgument(0); + + $this->assertEquals(new Reference('bar_extension'), $orderedExtensions[0]); + $this->assertEquals(new Reference('foo_extension'), $orderedExtensions[1]); + $this->assertEquals(new Reference('baz_extension'), $orderedExtensions[2]); + } + + public function testShouldDoesNothingIfClientExtensionServiceIsNotDefined() + { + $container = $this->createMock(ContainerBuilder::class); + $container + ->expects($this->once()) + ->method('hasDefinition') + ->with('enqueue.client.extensions') + ->willReturn(false) + ; + $container + ->expects($this->never()) + ->method('findTaggedServiceIds') + ; + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + } +} diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php similarity index 90% rename from pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php rename to pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php index 8f02a365b..048e0c467 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php @@ -2,7 +2,7 @@ namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Test\ClassExtensionTrait; use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -10,18 +10,18 @@ use Symfony\Component\DependencyInjection\Reference; use PHPUnit\Framework\TestCase; -class BuildExtensionsPassTest extends TestCase +class BuildConsumptionExtensionsPassTest extends TestCase { use ClassExtensionTrait; public function testShouldImplementCompilerPass() { - $this->assertClassImplements(CompilerPassInterface::class, BuildExtensionsPass::class); + $this->assertClassImplements(CompilerPassInterface::class, BuildConsumptionExtensionsPass::class); } public function testCouldBeConstructedWithoutAnyArguments() { - new BuildExtensionsPass(); + new BuildConsumptionExtensionsPass(); } public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWithTaggsExtensions() @@ -40,7 +40,7 @@ public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWith $extension->addTag('enqueue.consumption.extension'); $container->setDefinition('bar_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertEquals( @@ -69,7 +69,7 @@ public function testShouldOrderExtensionsByPriority() $extension->addTag('enqueue.consumption.extension', ['priority' => 2]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); @@ -99,7 +99,7 @@ public function testShouldAssumePriorityZeroIfPriorityIsNotSet() $extension->addTag('enqueue.consumption.extension', ['priority' => -1]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); diff --git a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php index ee10168f6..e5bc2f0a0 100644 --- a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php @@ -4,8 +4,9 @@ use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; @@ -46,7 +47,7 @@ public function testShouldRegisterExpectedCompilerPasses() $container ->expects($this->at(0)) ->method('addCompilerPass') - ->with($this->isInstanceOf(BuildExtensionsPass::class)) + ->with($this->isInstanceOf(BuildConsumptionExtensionsPass::class)) ; $container ->expects($this->at(1)) @@ -70,6 +71,11 @@ public function testShouldRegisterExpectedCompilerPasses() ; $container ->expects($this->at(5)) + ->method('addCompilerPass') + ->with($this->isInstanceOf(BuildClientExtensionsPass::class)) + ; + $container + ->expects($this->at(6)) ->method('getExtension') ->willReturn($extensionMock) ; diff --git a/pkg/enqueue/Client/ChainExtension.php b/pkg/enqueue/Client/ChainExtension.php index 2a6d0c3d6..c202e98e0 100644 --- a/pkg/enqueue/Client/ChainExtension.php +++ b/pkg/enqueue/Client/ChainExtension.php @@ -2,9 +2,6 @@ namespace Enqueue\Client; -use Enqueue\Psr\PsrContext; -use Psr\Log\LoggerInterface; - class ChainExtension implements ExtensionInterface { /** @@ -39,14 +36,4 @@ public function onPostSend($topic, Message $message) $extension->onPostSend($topic, $message); } } - - /** - * {@inheritdoc} - */ - public function onPostSetupBroker(PsrContext $context, LoggerInterface $logger = null) - { - foreach ($this->extensions as $extension) { - $extension->onPostSetupBroker($context, $logger); - } - } } diff --git a/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php b/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php index 2131a6fdc..8b6aecbc1 100644 --- a/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php +++ b/pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php @@ -2,8 +2,6 @@ namespace Enqueue\Client\ConsumptionExtension; -use Enqueue\Client\ChainExtension; -use Enqueue\Client\ExtensionInterface as ClientExtensionInterface; use Enqueue\Client\DriverInterface; use Enqueue\Consumption\Context; use Enqueue\Consumption\EmptyExtensionTrait; @@ -18,11 +16,6 @@ class SetupBrokerExtension implements ExtensionInterface */ private $driver; - /** - * @var ClientExtensionInterface - */ - private $extension; - /** * @var bool */ @@ -31,10 +24,9 @@ class SetupBrokerExtension implements ExtensionInterface /** * @param DriverInterface $driver */ - public function __construct(DriverInterface $driver, ClientExtensionInterface $extension = null) + public function __construct(DriverInterface $driver) { $this->driver = $driver; - $this->extension = $extension ?: new ChainExtension([]); $this->isDone = false; } @@ -46,7 +38,6 @@ public function onStart(Context $context) if (false == $this->isDone) { $this->isDone = true; $this->driver->setupBroker($context->getLogger()); - $this->extension->onPostSetupBroker($this->driver->getContext(), $context->getLogger()); } } } diff --git a/pkg/enqueue/Client/DriverInterface.php b/pkg/enqueue/Client/DriverInterface.php index f3b85b0dc..3737165e3 100644 --- a/pkg/enqueue/Client/DriverInterface.php +++ b/pkg/enqueue/Client/DriverInterface.php @@ -2,7 +2,6 @@ namespace Enqueue\Client; -use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Enqueue\Psr\PsrQueue; use Psr\Log\LoggerInterface; @@ -51,9 +50,4 @@ public function setupBroker(LoggerInterface $logger = null); * @return Config */ public function getConfig(); - - /** - * @return PsrContext - */ - public function getContext(); } diff --git a/pkg/enqueue/Client/ExtensionInterface.php b/pkg/enqueue/Client/ExtensionInterface.php index 1d1e73333..3b0a028e8 100644 --- a/pkg/enqueue/Client/ExtensionInterface.php +++ b/pkg/enqueue/Client/ExtensionInterface.php @@ -1,9 +1,6 @@ setPriority(MessagePriority::NORMAL); } - $this->extension->onPreSend($topic, $message); - if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) { if ($message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_QUEUE_NAME)); @@ -63,7 +61,9 @@ public function send($topic, $message) throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_NAME)); } + $this->extension->onPreSend($topic, $message); $this->driver->sendToRouter($message); + $this->extension->onPostSend($topic, $message); } elseif (Message::SCOPE_APP == $message->getScope()) { if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->driver->getConfig()->getRouterProcessorName()); @@ -72,12 +72,12 @@ public function send($topic, $message) $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->driver->getConfig()->getRouterQueueName()); } + $this->extension->onPreSend($topic, $message); $this->driver->sendToProcessor($message); + $this->extension->onPostSend($topic, $message); } else { throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope())); } - - $this->extension->onPostSend($topic, $message); } /** diff --git a/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php b/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php index 3ceb33196..ee7f7cf53 100644 --- a/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php +++ b/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php @@ -2,9 +2,7 @@ namespace Enqueue\Symfony\Client; -use Enqueue\Client\ChainExtension; use Enqueue\Client\DriverInterface; -use Enqueue\Client\ExtensionInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Logger\ConsoleLogger; @@ -17,20 +15,14 @@ class SetupBrokerCommand extends Command */ private $driver; - /** - * @var ExtensionInterface - */ - private $extension; - /** * @param DriverInterface $driver */ - public function __construct(DriverInterface $driver, ExtensionInterface $extension = null) + public function __construct(DriverInterface $driver) { parent::__construct(null); $this->driver = $driver; - $this->extension = $extension ?: new ChainExtension([]); } /** @@ -53,6 +45,5 @@ protected function execute(InputInterface $input, OutputInterface $output) $output->writeln('Setup Broker'); $this->driver->setupBroker($logger = new ConsoleLogger($output)); - $this->extension->onPostSetupBroker($this->driver->getContext(), $logger); } } diff --git a/pkg/enqueue/Tests/Client/ProducerTest.php b/pkg/enqueue/Tests/Client/ProducerTest.php index 9741810cc..79c7a0cb2 100644 --- a/pkg/enqueue/Tests/Client/ProducerTest.php +++ b/pkg/enqueue/Tests/Client/ProducerTest.php @@ -4,6 +4,7 @@ use Enqueue\Client\Config; use Enqueue\Client\DriverInterface; +use Enqueue\Client\ExtensionInterface; use Enqueue\Client\Message; use Enqueue\Client\MessagePriority; use Enqueue\Client\Producer; @@ -541,6 +542,62 @@ public function testThrowIfUnSupportedScopeGivenOnSend() $producer->send('topic', $message); } + public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToRouter() + { + $message = new Message(); + $message->setBody('aBody'); + $message->setScope(Message::SCOPE_MESSAGE_BUS); + + $extension = $this->createMock(ExtensionInterface::class); + $extension + ->expects($this->at(0)) + ->method('onPreSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + $extension + ->expects($this->at(1)) + ->method('onPostSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + + $driver = $this->createDriverStub(); + $driver + ->expects($this->once()) + ->method('sendToRouter') + ; + + $producer = new Producer($driver, $extension); + $producer->send('topic', $message); + } + + public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToProcessor() + { + $message = new Message(); + $message->setBody('aBody'); + $message->setScope(Message::SCOPE_APP); + + $extension = $this->createMock(ExtensionInterface::class); + $extension + ->expects($this->at(0)) + ->method('onPreSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + $extension + ->expects($this->at(1)) + ->method('onPostSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + + $driver = $this->createDriverStub(); + $driver + ->expects($this->once()) + ->method('sendToProcessor') + ; + + $producer = new Producer($driver, $extension); + $producer->send('topic', $message); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|DriverInterface */ From 1ac0989242a3e139a9a0e87eaabedbf737374a43 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 5 May 2017 13:42:05 +0300 Subject: [PATCH 4/4] client extension --- .../Symfony/Client/SetupBrokerCommand.php | 2 +- .../Tests/Client/ChainExtensionTest.php | 76 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 pkg/enqueue/Tests/Client/ChainExtensionTest.php diff --git a/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php b/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php index ee7f7cf53..c825bd2c4 100644 --- a/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php +++ b/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php @@ -44,6 +44,6 @@ protected function execute(InputInterface $input, OutputInterface $output) { $output->writeln('Setup Broker'); - $this->driver->setupBroker($logger = new ConsoleLogger($output)); + $this->driver->setupBroker(new ConsoleLogger($output)); } } diff --git a/pkg/enqueue/Tests/Client/ChainExtensionTest.php b/pkg/enqueue/Tests/Client/ChainExtensionTest.php new file mode 100644 index 000000000..3b1d82f9a --- /dev/null +++ b/pkg/enqueue/Tests/Client/ChainExtensionTest.php @@ -0,0 +1,76 @@ +assertClassImplements(ExtensionInterface::class, ChainExtension::class); + } + + public function testCouldBeConstructedWithExtensionsArray() + { + new ChainExtension([$this->createExtension(), $this->createExtension()]); + } + + public function testShouldProxyOnPreSendToAllInternalExtensions() + { + $message = new Message(); + + $fooExtension = $this->createExtension(); + $fooExtension + ->expects($this->once()) + ->method('onPreSend') + ->with('topic', $this->identicalTo($message)) + ; + $barExtension = $this->createExtension(); + $barExtension + ->expects($this->once()) + ->method('onPreSend') + ->with('topic', $this->identicalTo($message)) + ; + + $extensions = new ChainExtension([$fooExtension, $barExtension]); + + $extensions->onPreSend('topic', $message); + } + + public function testShouldProxyOnPostSendToAllInternalExtensions() + { + $message = new Message(); + + $fooExtension = $this->createExtension(); + $fooExtension + ->expects($this->once()) + ->method('onPostSend') + ->with('topic', $this->identicalTo($message)) + ; + $barExtension = $this->createExtension(); + $barExtension + ->expects($this->once()) + ->method('onPostSend') + ->with('topic', $this->identicalTo($message)) + ; + + $extensions = new ChainExtension([$fooExtension, $barExtension]); + + $extensions->onPostSend('topic', $message); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|ExtensionInterface + */ + protected function createExtension() + { + return $this->createMock(ExtensionInterface::class); + } +}