Skip to content

[client] Make symfony compiler passes multi client #577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public function load(array $configs, ContainerBuilder $container): void
$container->setParameter('enqueue.transports', array_keys($config['transport']));

if (isset($config['client'])) {
$container->setParameter('enqueue.clients', ['default']);

$this->setupAutowiringForProcessors($container);

$loader->load('client.yml');
Expand Down
14 changes: 7 additions & 7 deletions pkg/enqueue-bundle/EnqueueBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ public function build(ContainerBuilder $container): void
$container->addCompilerPass(new BuildProcessorRegistryPass());

//client passes
$container->addCompilerPass(new BuildClientConsumptionExtensionsPass('default'));
$container->addCompilerPass(new BuildClientExtensionsPass('default'));
$container->addCompilerPass(new BuildClientTopicSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
$container->addCompilerPass(new BuildClientCommandSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
$container->addCompilerPass(new BuildClientProcessorRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
$container->addCompilerPass(new AnalyzeRouteCollectionPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 30);
$container->addCompilerPass(new BuildClientProcessorRegistryPass('default'));
$container->addCompilerPass(new BuildClientConsumptionExtensionsPass());
$container->addCompilerPass(new BuildClientExtensionsPass());
$container->addCompilerPass(new BuildClientTopicSubscriberRoutesPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
$container->addCompilerPass(new BuildClientCommandSubscriberRoutesPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
$container->addCompilerPass(new BuildClientProcessorRoutesPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
$container->addCompilerPass(new AnalyzeRouteCollectionPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 30);
$container->addCompilerPass(new BuildClientProcessorRegistryPass());

if (class_exists(AsyncEventDispatcherExtension::class)) {
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,24 @@ public function testShouldSetPropertyWithAllConfiguredTransports()
$this->assertEquals(['default', 'foo', 'bar'], $container->getParameter('enqueue.transports'));
}

public function testShouldSetPropertyWithAllConfiguredClients()
{
$container = $this->getContainerBuilder(true);

$extension = new EnqueueExtension();
$extension->load([[
'client' => [],
'transport' => [
'default' => ['dsn' => 'default:'],
'foo' => ['dsn' => 'foo:'],
'bar' => ['dsn' => 'foo:'],
],
]], $container);

$this->assertTrue($container->hasParameter('enqueue.clients'));
$this->assertEquals(['default'], $container->getParameter('enqueue.clients'));
}

public function testShouldLoadProcessAutoconfigureChildDefinition()
{
$container = $this->getContainerBuilder(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,35 @@

final class AnalyzeRouteCollectionPass implements CompilerPassInterface
{
/**
* @var string
*/
private $name;
use FormatClientNameTrait;

public function __construct(string $clientName)
{
if (empty($clientName)) {
throw new \InvalidArgumentException('The name could not be empty.');
}

$this->name = $clientName;
}
protected $name;

public function process(ContainerBuilder $container): void
{
$routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name);
if (false == $container->hasDefinition($routeCollectionId)) {
return;
if (false == $container->hasParameter('enqueue.clients')) {
throw new \LogicException('The "enqueue.clients" parameter must be set.');
}

$collection = RouteCollection::fromArray($container->getDefinition($routeCollectionId)->getArgument(0));
$names = $container->getParameter('enqueue.clients');

foreach ($names as $name) {
$this->name = $name;
$routeCollectionId = $this->format('route_collection');
if (false == $container->hasDefinition($routeCollectionId)) {
throw new \LogicException(sprintf('Service "%s" not found', $routeCollectionId));
}

$this->exclusiveCommandsCouldNotBeRunOnDefaultQueue($collection);
$this->exclusiveCommandProcessorMustBeSingleOnGivenQueue($collection);
$collection = RouteCollection::fromArray($container->getDefinition($routeCollectionId)->getArgument(0));

$this->exclusiveCommandsCouldNotBeRunOnDefaultQueue($collection);
$this->exclusiveCommandProcessorMustBeSingleOnGivenQueue($collection);
}
}

protected function getName(): string
{
return $this->name;
}

private function exclusiveCommandsCouldNotBeRunOnDefaultQueue(RouteCollection $collection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,58 +8,62 @@

final class BuildClientExtensionsPass implements CompilerPassInterface
{
/**
* @var string
*/
private $name;
use FormatClientNameTrait;

public function __construct(string $clientName)
{
if (empty($clientName)) {
throw new \InvalidArgumentException('The name could not be empty.');
}

$this->name = $clientName;
}
protected $name;

public function process(ContainerBuilder $container): void
{
$extensionsId = sprintf('enqueue.client.%s.client_extensions', $this->name);
if (false == $container->hasDefinition($extensionsId)) {
return;
if (false == $container->hasParameter('enqueue.clients')) {
throw new \LogicException('The "enqueue.clients" parameter must be set.');
}

$tags = array_merge(
$container->findTaggedServiceIds('enqueue.client_extension'),
$container->findTaggedServiceIds('enqueue.client.extension') // TODO BC
);
$names = $container->getParameter('enqueue.clients');

$groupByPriority = [];
foreach ($tags as $serviceId => $tagAttributes) {
foreach ($tagAttributes as $tagAttribute) {
$client = $tagAttribute['client'] ?? 'default';
foreach ($names as $name) {
$this->name = $name;
$extensionsId = $this->format('client_extensions');
if (false == $container->hasDefinition($extensionsId)) {
throw new \LogicException(sprintf('Service "%s" not found', $extensionsId));
}

if ($client !== $this->name && 'all' !== $client) {
continue;
}
$tags = array_merge(
$container->findTaggedServiceIds('enqueue.client_extension'),
$container->findTaggedServiceIds('enqueue.client.extension') // TODO BC
);

$groupByPriority = [];
foreach ($tags as $serviceId => $tagAttributes) {
foreach ($tagAttributes as $tagAttribute) {
$client = $tagAttribute['client'] ?? 'default';

$priority = (int) ($tagAttribute['priority'] ?? 0);
if ($client !== $this->name && 'all' !== $client) {
continue;
}

$groupByPriority[$priority][] = new Reference($serviceId);
$priority = (int) ($tagAttribute['priority'] ?? 0);

$groupByPriority[$priority][] = new Reference($serviceId);
}
}
}

krsort($groupByPriority, SORT_NUMERIC);
krsort($groupByPriority, SORT_NUMERIC);

$flatExtensions = [];
foreach ($groupByPriority as $extension) {
$flatExtensions = array_merge($flatExtensions, $extension);
$flatExtensions = [];
foreach ($groupByPriority as $extension) {
$flatExtensions = array_merge($flatExtensions, $extension);
}

$extensionsService = $container->getDefinition($extensionsId);
$extensionsService->replaceArgument(0, array_merge(
$extensionsService->getArgument(0),
$flatExtensions
));
}
}

$extensionsService = $container->getDefinition($extensionsId);
$extensionsService->replaceArgument(0, array_merge(
$extensionsService->getArgument(0),
$flatExtensions
));
protected function getName(): string
{
return $this->name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,98 +10,102 @@

final class BuildCommandSubscriberRoutesPass implements CompilerPassInterface
{
/**
* @var string
*/
private $name;
use FormatClientNameTrait;

public function __construct(string $clientName)
{
if (empty($clientName)) {
throw new \InvalidArgumentException('The name could not be empty.');
}

$this->name = $clientName;
}
protected $name;

public function process(ContainerBuilder $container): void
{
$routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name);
if (false == $container->hasDefinition($routeCollectionId)) {
return;
if (false == $container->hasParameter('enqueue.clients')) {
throw new \LogicException('The "enqueue.clients" parameter must be set.');
}

$tag = 'enqueue.command_subscriber';
$routeCollection = new RouteCollection([]);
foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) {
$processorDefinition = $container->getDefinition($serviceId);
if ($processorDefinition->getFactory()) {
throw new \LogicException('The command subscriber tag could not be applied to a service created by factory.');
}
$names = $container->getParameter('enqueue.clients');

$processorClass = $processorDefinition->getClass();
if (false == class_exists($processorClass)) {
throw new \LogicException(sprintf('The processor class "%s" could not be found.', $processorClass));
foreach ($names as $name) {
$this->name = $name;
$routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name);
if (false == $container->hasDefinition($routeCollectionId)) {
throw new \LogicException(sprintf('Service "%s" not found', $routeCollectionId));
}

if (false == is_subclass_of($processorClass, CommandSubscriberInterface::class)) {
throw new \LogicException(sprintf('The processor must implement "%s" interface to be used with the tag "%s"', CommandSubscriberInterface::class, $tag));
}
$tag = 'enqueue.command_subscriber';
$routeCollection = new RouteCollection([]);
foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) {
$processorDefinition = $container->getDefinition($serviceId);
if ($processorDefinition->getFactory()) {
throw new \LogicException('The command subscriber tag could not be applied to a service created by factory.');
}

foreach ($tagAttributes as $tagAttribute) {
$client = $tagAttribute['client'] ?? 'default';
$processorClass = $processorDefinition->getClass();
if (false == class_exists($processorClass)) {
throw new \LogicException(sprintf('The processor class "%s" could not be found.', $processorClass));
}

if ($client !== $this->name && 'all' !== $client) {
continue;
if (false == is_subclass_of($processorClass, CommandSubscriberInterface::class)) {
throw new \LogicException(sprintf('The processor must implement "%s" interface to be used with the tag "%s"', CommandSubscriberInterface::class, $tag));
}

/** @var CommandSubscriberInterface $processorClass */
$commands = $processorClass::getSubscribedCommand();
foreach ($tagAttributes as $tagAttribute) {
$client = $tagAttribute['client'] ?? 'default';

if (empty($commands)) {
throw new \LogicException('Command subscriber must return something.');
}
if ($client !== $this->name && 'all' !== $client) {
continue;
}

if (is_string($commands)) {
$commands = [$commands];
}
/** @var CommandSubscriberInterface $processorClass */
$commands = $processorClass::getSubscribedCommand();

if (!is_array($commands)) {
throw new \LogicException('Command subscriber configuration is invalid. Should be an array or string.');
}
if (empty($commands)) {
throw new \LogicException('Command subscriber must return something.');
}

if (isset($commands['command'])) {
$commands = [$commands];
}
if (is_string($commands)) {
$commands = [$commands];
}

if (!is_array($commands)) {
throw new \LogicException('Command subscriber configuration is invalid. Should be an array or string.');
}

if (isset($commands['command'])) {
$commands = [$commands];
}

foreach ($commands as $key => $params) {
if (is_string($params)) {
$routeCollection->add(new Route($params, Route::COMMAND, $serviceId, ['processor_service_id' => $serviceId]));
} elseif (is_array($params)) {
$source = $params['command'] ?? null;
$processor = $params['processor'] ?? $serviceId;
unset($params['command'], $params['source'], $params['source_type'], $params['processor'], $params['options']);
$options = $params;
$options['processor_service_id'] = $serviceId;

$routeCollection->add(new Route($source, Route::COMMAND, $processor, $options));
} else {
throw new \LogicException(sprintf(
'Command subscriber configuration is invalid for "%s::getSubscribedCommand()". "%s"',
$processorClass,
json_encode($processorClass::getSubscribedCommand())
));
foreach ($commands as $key => $params) {
if (is_string($params)) {
$routeCollection->add(new Route($params, Route::COMMAND, $serviceId, ['processor_service_id' => $serviceId]));
} elseif (is_array($params)) {
$source = $params['command'] ?? null;
$processor = $params['processor'] ?? $serviceId;
unset($params['command'], $params['source'], $params['source_type'], $params['processor'], $params['options']);
$options = $params;
$options['processor_service_id'] = $serviceId;

$routeCollection->add(new Route($source, Route::COMMAND, $processor, $options));
} else {
throw new \LogicException(sprintf(
'Command subscriber configuration is invalid for "%s::getSubscribedCommand()". "%s"',
$processorClass,
json_encode($processorClass::getSubscribedCommand())
));
}
}
}
}
}

$rawRoutes = $routeCollection->toArray();
$rawRoutes = $routeCollection->toArray();

$routeCollectionService = $container->getDefinition($routeCollectionId);
$routeCollectionService->replaceArgument(0, array_merge(
$routeCollectionService->getArgument(0),
$rawRoutes
));
}
}

$routeCollectionService = $container->getDefinition($routeCollectionId);
$routeCollectionService->replaceArgument(0, array_merge(
$routeCollectionService->getArgument(0),
$rawRoutes
));
protected function getName(): string
{
return $this->name;
}
}
Loading