Skip to content

[client] Add ability to define a command as exclusive #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions docs/bundle/message_processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Here we just show how to register a message processor service to enqueue. Let's

* [Container tag](#container-tag)
* [Topic subscriber](#topic-subscriber)
* [Command subscriber](#command-subscriber)

# Container tag

Expand All @@ -27,8 +28,8 @@ The tag has some additional options:

# Topic subscriber

There is a `TopicSubscriber` interface (like [EventSubscriberInterface](https://github.com/symfony/symfony/blob/master/src/Symfony/Component/EventDispatcher/EventSubscriberInterface.php)).
It allows to keep subscription login and process logic closer to each other.
There is a `TopicSubscriberInterface` interface (like [EventSubscriberInterface](https://github.com/symfony/symfony/blob/master/src/Symfony/Component/EventDispatcher/EventSubscriberInterface.php)).
It is handy to subscribe on event messages. It allows to keep subscription login and process logic closer to each other.

```php
<?php
Expand Down Expand Up @@ -67,6 +68,81 @@ class SayHelloProcessor implements PsrProcessor, TopicSubscriberInterface

In the container you can just add the tag `enqueue.client.message_processor` and omit any other options:

```yaml
# src/AppBundle/Resources/services.yml

services:
app.async.say_hello_processor:
class: 'AppBundle\Async\SayHelloProcessor'
tags:
- { name: 'enqueue.client.processor'}

```

# Command subscriber

There is a `CommandSubscriberInterface` interface which allows to register a command handlers.
If you send a message using ProducerV2::sendCommand('aCommandName') method it will come to this processor.

```php
<?php
namespace AppBundle\Async;

use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Psr\PsrProcessor;

class SayHelloProcessor implements PsrProcessor, CommandSubscriberInterface
{
public static function getSubscribedCommand()
{
return 'aCommandName';
}
}
```

On the command subscriber you can also define additional settings such as queue and processor name:

```php
<?php
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Psr\PsrProcessor;

class SayHelloProcessor implements PsrProcessor, CommandSubscriberInterface
{
public static function getSubscribedCommand()
{
return ['queueName' => 'fooQueue', 'processorName' => 'aCommandName'];
}
}
```

There is a possibility to register a command processor which works exclusively on the queue (no other processors bound to it).
In this case you can send messages without setting any message properties at all. Here's an example of such a processor:

In the container you can just add the tag `enqueue.client.message_processor` and omit any other options:

```php
<?php
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Psr\PsrProcessor;

class SayHelloProcessor implements PsrProcessor, CommandSubscriberInterface
{
public static function getSubscribedCommand()
{
return [
'processorName' => 'the-exclusive-command-name',
'queueName' => 'the-queue-name',
'queueNameHardcoded' => true,
'exclusive' => true,
];
}
}
```

The same as a topic subscriber you have to tag a processor service (no need to add any options there):


```yaml
# src/AppBundle/Resources/services.yml

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Enqueue\Bundle\DependencyInjection\Compiler;

use Enqueue\Client\Config;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;

class BuildExclusiveCommandsExtensionPass implements CompilerPassInterface
{
use ExtractProcessorTagSubscriptionsTrait;

/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$processorTagName = 'enqueue.client.processor';
$extensionId = 'enqueue.client.exclusive_command_extension';
if (false == $container->hasDefinition($extensionId)) {
return;
}

$queueMetaRegistry = $container->getDefinition($extensionId);

$queueNameToProcessorNameMap = [];
foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) {
$subscriptions = $this->extractSubscriptions($container, $serviceId, $tagAttributes);

foreach ($subscriptions as $subscription) {
if (Config::COMMAND_TOPIC != $subscription['topicName']) {
continue;
}

if (false == isset($subscription['exclusive'])) {
continue;
}

if (false == $subscription['queueNameHardcoded']) {
throw new \LogicException('The exclusive command could be used only with queueNameHardcoded attribute set to true.');
}

$queueNameToProcessorNameMap[$subscription['queueName']] = $subscription['processorName'];
}
}

$queueMetaRegistry->replaceArgument(0, $queueNameToProcessorNameMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
'queueName' => null,
'queueNameHardcoded' => false,
'processorName' => null,
'exclusive' => false,
];

$data = [];
Expand Down Expand Up @@ -70,6 +71,7 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
'queueName' => $resolve($params['queueName']) ?: $defaultQueueName,
'queueNameHardcoded' => $resolve($params['queueNameHardcoded']),
'processorName' => $processorName,
'exclusive' => array_key_exists('exclusive', $params) ? $params['exclusive'] : false,
];
} else {
throw new \LogicException(sprintf(
Expand Down Expand Up @@ -123,6 +125,8 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
'queueName' => $resolve($tagAttribute['queueName']) ?: $defaultQueueName,
'queueNameHardcoded' => $resolve($tagAttribute['queueNameHardcoded']),
'processorName' => $resolve($tagAttribute['processorName']) ?: $processorServiceId,
'exclusive' => Config::COMMAND_TOPIC == $resolve($tagAttribute['topicName']) &&
array_key_exists('exclusive', $tagAttribute) ? $tagAttribute['exclusive'] : false,
];
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public function load(array $configs, ContainerBuilder $container)
if (isset($config['client'])) {
$loader->load('client.yml');
$loader->load('extensions/flush_spool_producer_extension.yml');
$loader->load('extensions/exclusive_command_extension.yml');

foreach ($config['transport'] as $name => $transportConfig) {
$this->factories[$name]->createDriver($container, $transportConfig);
Expand Down
2 changes: 2 additions & 0 deletions pkg/enqueue-bundle/EnqueueBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildExclusiveCommandsExtensionPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
Expand Down Expand Up @@ -42,6 +43,7 @@ public function build(ContainerBuilder $container)
$container->addCompilerPass(new BuildTopicMetaSubscribersPass());
$container->addCompilerPass(new BuildQueueMetaRegistryPass());
$container->addCompilerPass(new BuildClientExtensionsPass());
$container->addCompilerPass(new BuildExclusiveCommandsExtensionPass());

/** @var EnqueueExtension $extension */
$extension = $container->getExtension('enqueue');
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
services:
enqueue.client.exclusive_command_extension:
class: 'Enqueue\Client\ConsumptionExtension\ExclusiveCommandExtension'
public: false
arguments:
- []
tags:
- { name: 'enqueue.consumption.extension', priority: 100 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<?php

namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler;

use Enqueue\Bundle\DependencyInjection\Compiler\BuildExclusiveCommandsExtensionPass;
use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ExclusiveButQueueNameHardCodedCommandSubscriber;
use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ExclusiveCommandSubscriber;
use Enqueue\Client\Config;
use Enqueue\Client\ConsumptionExtension\ExclusiveCommandExtension;
use Enqueue\Psr\PsrProcessor;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;

class BuildExclusiveCommandsExtensionPassTest extends TestCase
{
use ClassExtensionTrait;

public function testShouldImplementCompilerPass()
{
$this->assertClassImplements(CompilerPassInterface::class, BuildExclusiveCommandsExtensionPass::class);
}

public function testCouldBeConstructedWithoutAnyArguments()
{
new BuildExclusiveCommandsExtensionPass();
}

public function testShouldDoNothingIfExclusiveCommandExtensionServiceNotRegistered()
{
$container = new ContainerBuilder();

$pass = new BuildExclusiveCommandsExtensionPass();
$pass->process($container);
}

public function testShouldReplaceFirstArgumentOfExclusiveCommandExtensionServiceConstructorWithExpectedMap()
{
$container = new ContainerBuilder();
$container->setParameter('enqueue.client.default_queue_name', 'default');
$container->register('enqueue.client.exclusive_command_extension', ExclusiveCommandExtension::class)
->addArgument([])
;

$processor = new Definition(ExclusiveCommandSubscriber::class);
$processor->addTag('enqueue.client.processor');
$container->setDefinition('processor-id', $processor);

$pass = new BuildExclusiveCommandsExtensionPass();

$pass->process($container);

$this->assertEquals([
'the-queue-name' => 'the-exclusive-command-name',
], $container->getDefinition('enqueue.client.exclusive_command_extension')->getArgument(0));
}

public function testShouldReplaceFirstArgumentOfExclusiveCommandConfiguredAsTagAttribute()
{
$container = new ContainerBuilder();
$container->setParameter('enqueue.client.default_queue_name', 'default');
$container->register('enqueue.client.exclusive_command_extension', ExclusiveCommandExtension::class)
->addArgument([])
;

$processor = new Definition($this->getMockClass(PsrProcessor::class));
$processor->addTag('enqueue.client.processor', [
'topicName' => Config::COMMAND_TOPIC,
'processorName' => 'the-exclusive-command-name',
'queueName' => 'the-queue-name',
'queueNameHardcoded' => true,
'exclusive' => true,
]);
$container->setDefinition('processor-id', $processor);

$pass = new BuildExclusiveCommandsExtensionPass();

$pass->process($container);

$this->assertEquals([
'the-queue-name' => 'the-exclusive-command-name',
], $container->getDefinition('enqueue.client.exclusive_command_extension')->getArgument(0));
}

public function testShouldThrowIfExclusiveSetTrueButQueueNameIsNotHardcoded()
{
$container = new ContainerBuilder();
$container->setParameter('enqueue.client.default_queue_name', 'default');
$container->register('enqueue.client.exclusive_command_extension', ExclusiveCommandExtension::class)
->addArgument([])
;

$processor = new Definition(ExclusiveButQueueNameHardCodedCommandSubscriber::class);
$processor->addTag('enqueue.client.processor');
$container->setDefinition('processor-id', $processor);

$pass = new BuildExclusiveCommandsExtensionPass();

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The exclusive command could be used only with queueNameHardcoded attribute set to true.');
$pass->process($container);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock;

use Enqueue\Client\CommandSubscriberInterface;

class ExclusiveButQueueNameHardCodedCommandSubscriber implements CommandSubscriberInterface
{
public static function getSubscribedCommand()
{
return [
'processorName' => 'the-exclusive-command-name',
'queueName' => 'the-queue-name',
'queueNameHardCoded' => false,
'exclusive' => true,
];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock;

use Enqueue\Client\CommandSubscriberInterface;

class ExclusiveCommandSubscriber implements CommandSubscriberInterface
{
public static function getSubscribedCommand()
{
return [
'processorName' => 'the-exclusive-command-name',
'queueName' => 'the-queue-name',
'queueNameHardcoded' => true,
'exclusive' => true,
];
}
}
6 changes: 6 additions & 0 deletions pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildExclusiveCommandsExtensionPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
Expand Down Expand Up @@ -74,6 +75,11 @@ public function testShouldRegisterExpectedCompilerPasses()
;
$container
->expects($this->at(6))
->method('addCompilerPass')
->with($this->isInstanceOf(BuildExclusiveCommandsExtensionPass::class))
;
$container
->expects($this->at(7))
->method('getExtension')
->willReturn($extensionMock)
;
Expand Down
3 changes: 2 additions & 1 deletion pkg/enqueue/Client/CommandSubscriberInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ interface CommandSubscriberInterface
* 'processorName' => 'aCommandName',
* 'queueName' => 'a_client_queue_name',
* 'queueNameHardcoded' => true,
* 'exclusive' => true,
* ]
*
* queueName and queueNameHardcoded are optional.
* queueName, exclusive and queueNameHardcoded are optional.
*
* Note: If you set queueNameHardcoded to true then the queueName is used as is and therefor the driver is not used to create a transport queue name.
*
Expand Down
Loading