diff --git a/docker-compose.yml b/docker-compose.yml index 459ac5c46..542ed21af 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,7 +10,7 @@ services: volumes: - './:/mqdev' environment: - - AMQP_DSN=amqp://rabbitmq + - AMQP_DSN=amqp://guest:guest@rabbitmq:5672/mqdev - SYMFONY__RABBITMQ__HOST=rabbitmq - SYMFONY__RABBITMQ__USER=guest - SYMFONY__RABBITMQ__PASSWORD=guest diff --git a/docs/bundle/quick_tour.md b/docs/bundle/quick_tour.md index de405408e..e4513c0cd 100644 --- a/docs/bundle/quick_tour.md +++ b/docs/bundle/quick_tour.md @@ -45,8 +45,7 @@ First, you have to configure a transport layer and set one to be default. enqueue: transport: - default: 'amqp' - amqp: "amqp://" + default: "amqp://" client: ~ ``` diff --git a/docs/client/quick_tour.md b/docs/client/quick_tour.md index 3cca27de2..3d751a7ce 100644 --- a/docs/client/quick_tour.md +++ b/docs/client/quick_tour.md @@ -22,15 +22,7 @@ use Enqueue\SimpleClient\SimpleClient; include __DIR__.'/vendor/autoload.php'; -$client = new SimpleClient([ - 'transport' => [ - 'default' => 'amqp', - 'amqp' => 'amqp://' - ], - 'client' => [ - 'app_name' => 'plain_php', - ], -]); +$client = new SimpleClient('amqp://'); ``` ## Produce message diff --git a/docs/client/rpc_call.md b/docs/client/rpc_call.md index cccd4f602..b8fc13698 100644 --- a/docs/client/rpc_call.md +++ b/docs/client/rpc_call.md @@ -1,18 +1,23 @@ # Client. RPC call The client's [quick tour](quick_tour.md) describes how to get the client object. -We use you followed instructions there and have instance of `Enqueue\SimpleClient\SimpleClient` in `$client` var. +Here we'll use `Enqueue\SimpleClient\SimpleClient` though it is not required. +You can get all that stuff from manually built client or get objects from a container (Symfony). + +The simple client could be created like this: ## The client side There is a handy class RpcClient shipped with the client component. -It allows you to easily send a message and wait for a reply. +It allows you to easily perform [RPC calls](https://en.wikipedia.org/wiki/Remote_procedure_call). +It send a message and wait for a reply. ```php getProducer(), $context); @@ -24,8 +29,9 @@ You can perform several requests asynchronously with `callAsync` and request rep ```php getProducer(), $context); @@ -54,10 +60,11 @@ use Enqueue\Psr\PsrContext; use Enqueue\Consumption\Result; use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\ReplyExtension; +use Enqueue\SimpleClient\SimpleClient; /** @var \Enqueue\Psr\PsrContext $context */ -/** @var \Enqueue\SimpleClient\SimpleClient $client */ +$client = new SimpleClient('amqp://'); $client->bind('greeting_topic', 'greeting_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) { echo $message->getBody(); diff --git a/docs/quick_tour.md b/docs/quick_tour.md index 1c32a73ef..3d334d3a2 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -170,19 +170,11 @@ Here's an example of how you can send and consume messages. use Enqueue\SimpleClient\SimpleClient; use Enqueue\Psr\PsrMessage; -$client = new SimpleClient([ - 'transport' => [ - 'default' => 'amqp', - 'amqp' => [ - 'host' => 'localhost', - 'port' => 5672, - 'vhost' => '/', - 'user' => 'guest', - 'pass' => 'guest', - ], - ], - 'client' => true, -]); +// composer require enqueue/amqp-ext +$client = new SimpleClient('amqp://'); + +// composer require enqueue/fs +$client = new SimpleClient('file://foo/bar'); $client->setupBroker(); diff --git a/pkg/amqp-ext/AmqpConnectionFactory.php b/pkg/amqp-ext/AmqpConnectionFactory.php index 8d968bb32..9aceb9fe3 100644 --- a/pkg/amqp-ext/AmqpConnectionFactory.php +++ b/pkg/amqp-ext/AmqpConnectionFactory.php @@ -77,7 +77,9 @@ private function establishConnection() $config = $this->config; $config['login'] = $this->config['user']; $config['password'] = $this->config['pass']; + $this->connection = new \AMQPConnection($config); + $this->config['persisted'] ? $this->connection->pconnect() : $this->connection->connect(); } if (false == $this->connection->isConnected()) { diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php index 47e015a5d..1679793e2 100644 --- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php @@ -43,6 +43,12 @@ public function provideEnqueueConfigs() ], ]]; + yield 'default_amqp_as_dsn' => [[ + 'transport' => [ + 'default' => getenv('AMQP_DSN'), + ], + ]]; + yield 'stomp' => [[ 'transport' => [ 'default' => 'stomp', @@ -97,6 +103,12 @@ public function provideEnqueueConfigs() ], ]]; + yield 'default_fs_as_dsn' => [[ + 'transport' => [ + 'default' => 'file:/'.sys_get_temp_dir(), + ], + ]]; + yield 'dbal' => [[ 'transport' => [ 'default' => 'dbal', diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index 84210f390..a503c9ae6 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -7,14 +7,14 @@ use Enqueue\Bundle\Tests\Unit\Mocks\FooTransportFactory; use Enqueue\Client\Producer; use Enqueue\Client\TraceableProducer; -use Enqueue\Symfony\DefaultTransportFactory; +use Enqueue\Null\NullContext; use Enqueue\Null\Symfony\NullTransportFactory; +use Enqueue\Symfony\DefaultTransportFactory; use Enqueue\Test\ClassExtensionTrait; -use Enqueue\Null\NullContext; +use PHPUnit\Framework\TestCase; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Reference; use Symfony\Component\HttpKernel\DependencyInjection\Extension; -use PHPUnit\Framework\TestCase; class EnqueueExtensionTest extends TestCase { @@ -95,6 +95,30 @@ public function testShouldUseNullTransportAsDefault() ); } + public function testShouldUseNullTransportAsDefaultConfiguredViaDSN() + { + $container = new ContainerBuilder(); + + $extension = new EnqueueExtension(); + $extension->addTransportFactory(new NullTransportFactory()); + $extension->addTransportFactory(new DefaultTransportFactory()); + + $extension->load([[ + 'transport' => [ + 'default' => 'null://', + ], + ]], $container); + + self::assertEquals( + 'enqueue.transport.default.context', + (string) $container->getAlias('enqueue.transport.context') + ); + self::assertEquals( + 'enqueue.transport.default_null.context', + (string) $container->getAlias('enqueue.transport.default.context') + ); + } + public function testShouldConfigureFooTransport() { $container = new ContainerBuilder(); @@ -445,7 +469,7 @@ public function testShouldAddJobQueueEntityMapping() $extension->prepend($container); - $config = $container->getExtensionConfig('doctrine'); + $config = $container->getExtensionConfig('doctrine'); $this->assertSame(['dbal' => true], $config[1]); $this->assertNotEmpty($config[0]['orm']['mappings']['enqueue_job_queue']); diff --git a/pkg/enqueue/Symfony/DefaultTransportFactory.php b/pkg/enqueue/Symfony/DefaultTransportFactory.php index a7d9e831e..6d49560cf 100644 --- a/pkg/enqueue/Symfony/DefaultTransportFactory.php +++ b/pkg/enqueue/Symfony/DefaultTransportFactory.php @@ -2,8 +2,15 @@ namespace Enqueue\Symfony; +use Enqueue\AmqpExt\AmqpConnectionFactory; +use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; +use Enqueue\Fs\FsConnectionFactory; +use Enqueue\Fs\Symfony\FsTransportFactory; +use Enqueue\Null\NullConnectionFactory; +use Enqueue\Null\Symfony\NullTransportFactory; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\DependencyInjection\ContainerBuilder; +use function Enqueue\dsn_to_connection_factory; class DefaultTransportFactory implements TransportFactoryInterface { @@ -29,18 +36,30 @@ public function addConfiguration(ArrayNodeDefinition $builder) ->beforeNormalization() ->ifString() ->then(function ($v) { - return ['alias' => $v]; + if (false === strpos($v, '://')) { + return ['alias' => $v]; + } + + return ['dsn' => $v]; }) ->end() ->children() - ->scalarNode('alias')->isRequired()->cannotBeEmpty()->end() - ; + ->scalarNode('alias')->cannotBeEmpty()->end() + ->scalarNode('dsn')->cannotBeEmpty()->end() + ; } public function createConnectionFactory(ContainerBuilder $container, array $config) { + if (isset($config['alias'])) { + $aliasId = sprintf('enqueue.transport.%s.connection_factory', $config['alias']); + } elseif (isset($config['dsn'])) { + $aliasId = $this->findFactory($config['dsn'])->createConnectionFactory($container, $config); + } else { + throw new \LogicException('Either dsn or alias option must be set.'); + } + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); - $aliasId = sprintf('enqueue.transport.%s.connection_factory', $config['alias']); $container->setAlias($factoryId, $aliasId); $container->setAlias('enqueue.transport.connection_factory', $factoryId); @@ -53,8 +72,15 @@ public function createConnectionFactory(ContainerBuilder $container, array $conf */ public function createContext(ContainerBuilder $container, array $config) { + if (isset($config['alias'])) { + $aliasId = sprintf('enqueue.transport.%s.context', $config['alias']); + } elseif (isset($config['dsn'])) { + $aliasId = $this->findFactory($config['dsn'])->createContext($container, $config); + } else { + throw new \LogicException('Either dsn or alias option must be set.'); + } + $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); - $aliasId = sprintf('enqueue.transport.%s.context', $config['alias']); $container->setAlias($contextId, $aliasId); $container->setAlias('enqueue.transport.context', $contextId); @@ -67,8 +93,15 @@ public function createContext(ContainerBuilder $container, array $config) */ public function createDriver(ContainerBuilder $container, array $config) { + if (isset($config['alias'])) { + $aliasId = sprintf('enqueue.client.%s.driver', $config['alias']); + } elseif (isset($config['dsn'])) { + $aliasId = $this->findFactory($config['dsn'])->createDriver($container, $config); + } else { + throw new \LogicException('Either dsn or alias option must be set.'); + } + $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); - $aliasId = sprintf('enqueue.client.%s.driver', $config['alias']); $container->setAlias($driverId, $aliasId); $container->setAlias('enqueue.client.driver', $driverId); @@ -83,4 +116,33 @@ public function getName() { return $this->name; } + + /** + * @param string + * @param mixed $dsn + * + * @return TransportFactoryInterface + */ + private function findFactory($dsn) + { + $connectionFactory = dsn_to_connection_factory($dsn); + + if ($connectionFactory instanceof AmqpConnectionFactory) { + return new AmqpTransportFactory('default_amqp'); + } + + if ($connectionFactory instanceof FsConnectionFactory) { + return new FsTransportFactory('default_fs'); + } + + if ($connectionFactory instanceof NullConnectionFactory) { + return new NullTransportFactory('default_null'); + } + + throw new \LogicException(sprintf( + 'There is no supported transport factory for the connection factory "%s" created from DSN "%s"', + get_class($connectionFactory), + $dsn + )); + } } diff --git a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php index 610727e37..bc73746f0 100644 --- a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php @@ -33,7 +33,7 @@ public function testCouldBeConstructedWithCustomName() $this->assertEquals('theCustomName', $transport->getName()); } - public function testShouldAllowAddConfiguration() + public function testShouldAllowAddConfigurationAsAliasAsString() { $transport = new DefaultTransportFactory(); $tb = new TreeBuilder(); @@ -46,7 +46,87 @@ public function testShouldAllowAddConfiguration() $this->assertEquals(['alias' => 'the_alias'], $config); } - public function testShouldCreateConnectionFactory() + public function testShouldAllowAddConfigurationAsAliasAsOption() + { + $transport = new DefaultTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [['alias' => 'the_alias']]); + + $this->assertEquals(['alias' => 'the_alias'], $config); + } + + public function testShouldAllowAddConfigurationAsDsn() + { + $transport = new DefaultTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), ['dsn://']); + + $this->assertEquals(['dsn' => 'dsn://'], $config); + } + + public function testThrowIfNeitherDsnNorAliasConfiguredOnCreateConnectionFactory() + { + $transport = new DefaultTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[]]); + + // guard + $this->assertEquals([], $config); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Either dsn or alias option must be set'); + $transport->createConnectionFactory(new ContainerBuilder(), $config); + } + + public function testThrowIfNeitherDsnNorAliasConfiguredOnCreateContext() + { + $transport = new DefaultTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[]]); + + // guard + $this->assertEquals([], $config); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Either dsn or alias option must be set'); + $transport->createContext(new ContainerBuilder(), $config); + } + + public function testThrowIfNeitherDsnNorAliasConfiguredOnCreateDriver() + { + $transport = new DefaultTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[]]); + + // guard + $this->assertEquals([], $config); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Either dsn or alias option must be set'); + $transport->createDriver(new ContainerBuilder(), $config); + } + + public function testShouldCreateConnectionFactoryFromAlias() { $container = new ContainerBuilder(); @@ -69,7 +149,7 @@ public function testShouldCreateConnectionFactory() ); } - public function testShouldCreateContext() + public function testShouldCreateContextFromAlias() { $container = new ContainerBuilder(); @@ -88,7 +168,7 @@ public function testShouldCreateContext() $this->assertEquals($serviceId, (string) $context); } - public function testShouldCreateDriver() + public function testShouldCreateDriverFromAlias() { $container = new ContainerBuilder(); @@ -106,4 +186,98 @@ public function testShouldCreateDriver() $context = $container->getAlias('enqueue.client.driver'); $this->assertEquals($driverId, (string) $context); } + + /** + * @dataProvider provideDSNs + * + * @param mixed $dsn + * @param mixed $expectedName + */ + public function testShouldCreateConnectionFactoryFromDSN($dsn, $expectedName) + { + $container = new ContainerBuilder(); + + $transport = new DefaultTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, ['dsn' => $dsn]); + + $this->assertEquals('enqueue.transport.default.connection_factory', $serviceId); + + $this->assertTrue($container->hasAlias('enqueue.transport.default.connection_factory')); + $this->assertEquals( + sprintf('enqueue.transport.%s.connection_factory', $expectedName), + (string) $container->getAlias('enqueue.transport.default.connection_factory') + ); + + $this->assertTrue($container->hasAlias('enqueue.transport.connection_factory')); + $this->assertEquals( + 'enqueue.transport.default.connection_factory', + (string) $container->getAlias('enqueue.transport.connection_factory') + ); + } + + /** + * @dataProvider provideDSNs + * + * @param mixed $dsn + * @param mixed $expectedName + */ + public function testShouldCreateContextFromDsn($dsn, $expectedName) + { + $container = new ContainerBuilder(); + + $transport = new DefaultTransportFactory(); + + $serviceId = $transport->createContext($container, ['dsn' => $dsn]); + + $this->assertEquals('enqueue.transport.default.context', $serviceId); + + $this->assertTrue($container->hasAlias($serviceId)); + $context = $container->getAlias($serviceId); + $this->assertEquals( + sprintf('enqueue.transport.%s.context', $expectedName), + (string) $context + ); + + $this->assertTrue($container->hasAlias('enqueue.transport.context')); + $context = $container->getAlias('enqueue.transport.context'); + $this->assertEquals($serviceId, (string) $context); + } + + /** + * @dataProvider provideDSNs + * + * @param mixed $dsn + * @param mixed $expectedName + */ + public function testShouldCreateDriverFromDsn($dsn, $expectedName) + { + $container = new ContainerBuilder(); + + $transport = new DefaultTransportFactory(); + + $driverId = $transport->createDriver($container, ['dsn' => $dsn]); + + $this->assertEquals('enqueue.client.default.driver', $driverId); + + $this->assertTrue($container->hasAlias($driverId)); + $context = $container->getAlias($driverId); + $this->assertEquals( + sprintf('enqueue.client.%s.driver', $expectedName), + (string) $context + ); + + $this->assertTrue($container->hasAlias('enqueue.client.driver')); + $context = $container->getAlias('enqueue.client.driver'); + $this->assertEquals($driverId, (string) $context); + } + + public static function provideDSNs() + { + yield ['amqp://', 'default_amqp']; + + yield ['null://', 'default_null']; + + yield ['file://', 'default_fs']; + } } diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index 5879b969d..a65b719ee 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -1,4 +1,5 @@ [ + * 'default' => 'amqp', + * 'amqp' => [], // amqp options here + * ], + * ] + * + * or a with all details: + * * $config = [ * 'transport' => [ * 'default' => 'amqp', @@ -59,102 +78,6 @@ public function __construct($config) $this->container = $this->buildContainer($config); } - /** - * @param array|string $config - * - * @return ContainerBuilder - */ - private function buildContainer($config) - { - $config = $this->buildConfig($config); - $extension = $this->buildContainerExtension($config); - - $container = new ContainerBuilder(); - $container->registerExtension($extension); - $container->loadFromExtension($extension->getAlias(), $config); - - $container->compile(); - - return $container; - } - - /** - * @param array $config - * - * @return SimpleClientContainerExtension - */ - private function buildContainerExtension($config) - { - $map = [ - 'default' => DefaultTransportFactory::class, - 'amqp' => AmqpTransportFactory::class, - 'rabbitmq_amqp' => RabbitMqAmqpTransportFactory::class, - 'dbal' => DbalTransportFactory::class, - 'fs' => FsTransportFactory::class, - 'redis' => RedisTransportFactory::class, - 'stomp' => StompTransportFactory::class, - 'rabbitmq_stomp' => RabbitMqStompTransportFactory::class, - 'sqs' => SqsTransportFactory::class, - ]; - - $extension = new SimpleClientContainerExtension(); - - foreach (array_keys($config['transport']) as $transport) { - if (false == isset($map[$transport])) { - throw new \LogicException(sprintf('Transport is not supported: "%s"', $transport)); - } - - $extension->addTransportFactory(new $map[$transport]); - } - - return $extension; - } - - /** - * @param array|string $config - * - * @return array - */ - private function buildConfig($config) - { - if (is_string($config)) { - $extConfig = [ - 'client' => [], - 'transport' => [ - 'default' => $config, - $config => [], - ], - ]; - } elseif (is_array($config)) { - $extConfig = array_merge_recursive([ - 'client' => [], - 'transport' => [], - ], $config); - } else { - throw new \LogicException('Expects config is string or array'); - } - - if (empty($extConfig['transport']['default'])) { - $defaultTransport = null; - foreach ($extConfig['transport'] as $transport => $config) { - if ('default' === $transport) { - continue; - } - - $defaultTransport = $transport; - break; - } - - if (false == $defaultTransport) { - throw new \LogicException('There is no transport configured'); - } - - $extConfig['transport']['default'] = $defaultTransport; - } - - return $extConfig; - } - /** * @param string $topic * @param string $processorName @@ -207,7 +130,7 @@ public function consume(ExtensionInterface $runtimeExtension = null) */ public function getContext() { - return $this->container->get('enqueue.transport.context'); + return $this->container->get('enqueue.transport.context'); } /** @@ -290,4 +213,103 @@ public function getRouterProcessor() { return $this->container->get('enqueue.client.router_processor'); } + + /** + * @param array|string $config + * + * @return ContainerBuilder + */ + private function buildContainer($config) + { + $config = $this->buildConfig($config); + $extension = $this->buildContainerExtension(); + + $container = new ContainerBuilder(); + $container->registerExtension($extension); + $container->loadFromExtension($extension->getAlias(), $config); + + $container->compile(); + + return $container; + } + + /** + * @return SimpleClientContainerExtension + */ + private function buildContainerExtension() + { + $map = [ + 'default' => DefaultTransportFactory::class, + 'amqp' => AmqpTransportFactory::class, + 'rabbitmq_amqp' => RabbitMqAmqpTransportFactory::class, + 'dbal' => DbalTransportFactory::class, + 'fs' => FsTransportFactory::class, + 'redis' => RedisTransportFactory::class, + 'stomp' => StompTransportFactory::class, + 'rabbitmq_stomp' => RabbitMqStompTransportFactory::class, + 'sqs' => SqsTransportFactory::class, + ]; + + $extension = new SimpleClientContainerExtension(); + + foreach ($map as $name => $factoryClass) { + if (class_exists($factoryClass)) { + $extension->addTransportFactory(new $factoryClass($name)); + } + } + + return $extension; + } + + /** + * @param array|string $config + * + * @return array + */ + private function buildConfig($config) + { + if (is_string($config) && false !== strpos($config, '://')) { + $extConfig = [ + 'client' => [], + 'transport' => [ + 'default' => $config, + ], + ]; + } elseif (is_string($config)) { + $extConfig = [ + 'client' => [], + 'transport' => [ + 'default' => $config, + $config => [], + ], + ]; + } elseif (is_array($config)) { + $extConfig = array_merge_recursive([ + 'client' => [], + 'transport' => [], + ], $config); + } else { + throw new \LogicException('Expects config is string or array'); + } + + if (empty($extConfig['transport']['default'])) { + $defaultTransport = null; + foreach ($extConfig['transport'] as $transport => $config) { + if ('default' === $transport) { + continue; + } + + $defaultTransport = $transport; + break; + } + + if (false == $defaultTransport) { + throw new \LogicException('There is no transport configured'); + } + + $extConfig['transport']['default'] = $defaultTransport; + } + + return $extConfig; + } } diff --git a/pkg/simple-client/SimpleClientContainerExtension.php b/pkg/simple-client/SimpleClientContainerExtension.php index 0784cfb17..00cab237b 100644 --- a/pkg/simple-client/SimpleClientContainerExtension.php +++ b/pkg/simple-client/SimpleClientContainerExtension.php @@ -1,4 +1,5 @@ root('enqueue'); - - $transportChildren = $rootNode->children() - ->arrayNode('transport')->isRequired()->children(); - - foreach ($this->factories as $factory) { - $factory->addConfiguration( - $transportChildren->arrayNode($factory->getName()) - ); - } - - $rootNode->children() - ->arrayNode('client')->children() - ->scalarNode('prefix')->defaultValue('enqueue')->end() - ->scalarNode('app_name')->defaultValue('app')->end() - ->scalarNode('router_topic')->defaultValue('router')->cannotBeEmpty()->end() - ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() - ->end()->end() - ->arrayNode('extensions')->addDefaultsIfNotSet()->children() - ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() - ->end()->end() - ; - - return $tb->buildTree(); - } - /** * @param TransportFactoryInterface $transportFactory */ @@ -105,6 +72,11 @@ public function load(array $configs, ContainerBuilder $container) $this->factories[$name]->createDriver($container, $transportConfig); } + $transportConfig = isset($config['transport']['default']['alias']) ? + $config['transport'][$config['transport']['default']['alias']] : + [] + ; + $container->register('enqueue.client.config', Config::class) ->setArguments([ $config['client']['prefix'], @@ -113,12 +85,12 @@ public function load(array $configs, ContainerBuilder $container) $config['client']['router_queue'], $config['client']['default_processor_queue'], 'enqueue.client.router_processor', - $config['transport'][$config['transport']['default']['alias']], + $transportConfig, ]); $container->register('enqueue.client.producer', Producer::class) ->setArguments([ - new Reference('enqueue.client.driver') + new Reference('enqueue.client.driver'), ]); $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class) @@ -138,7 +110,7 @@ public function load(array $configs, ContainerBuilder $container) $container->register('enqueue.client.queue_consumer', QueueConsumer::class) ->setArguments([ new Reference('enqueue.transport.context'), - new Reference('enqueue.consumption.extensions') + new Reference('enqueue.consumption.extensions'), ]); // router @@ -155,7 +127,7 @@ public function load(array $configs, ContainerBuilder $container) $container->register('enqueue.client.delay_redelivered_message_extension', DelayRedeliveredMessageExtension::class) ->setArguments([ new Reference('enqueue.client.driver'), - $config['client']['redelivered_delay_time'] + $config['client']['redelivered_delay_time'], ]); $extensions[] = new Reference('enqueue.client.delay_redelivered_message_extension'); @@ -169,4 +141,38 @@ public function load(array $configs, ContainerBuilder $container) $container->register('enqueue.consumption.extensions', ConsumptionChainExtension::class) ->setArguments([$extensions]); } + + /** + * @return NodeInterface + */ + private function createConfiguration() + { + $tb = new TreeBuilder(); + $rootNode = $tb->root('enqueue'); + + $transportChildren = $rootNode->children() + ->arrayNode('transport')->isRequired()->children(); + + foreach ($this->factories as $factory) { + $factory->addConfiguration( + $transportChildren->arrayNode($factory->getName()) + ); + } + + $rootNode->children() + ->arrayNode('client')->children() + ->scalarNode('prefix')->defaultValue('enqueue')->end() + ->scalarNode('app_name')->defaultValue('app')->end() + ->scalarNode('router_topic')->defaultValue('router')->cannotBeEmpty()->end() + ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() + ->end()->end() + ->arrayNode('extensions')->addDefaultsIfNotSet()->children() + ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() + ->end()->end() + ; + + return $tb->buildTree(); + } } diff --git a/pkg/simple-client/Tests/Functional/SimpleClientTest.php b/pkg/simple-client/Tests/Functional/SimpleClientTest.php index 9bd572478..06798ca2b 100644 --- a/pkg/simple-client/Tests/Functional/SimpleClientTest.php +++ b/pkg/simple-client/Tests/Functional/SimpleClientTest.php @@ -1,12 +1,13 @@ [[ 'transport' => [ + 'default' => 'amqp', 'amqp' => [ 'host' => getenv('SYMFONY__RABBITMQ__HOST'), 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), @@ -40,10 +42,26 @@ public function transportConfigDataProvider() 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), ], ], - ]; + ]]; - $rabbitmqAmqp = [ + yield 'config_as_dsn_string' => [getenv('AMQP_DSN')]; + + yield 'amqp_dsn' => [[ 'transport' => [ + 'default' => 'amqp', + 'amqp' => getenv('AMQP_DSN'), + ], + ]]; + + yield 'default_amqp_as_dsn' => [[ + 'transport' => [ + 'default' => getenv('AMQP_DSN'), + ], + ]]; + + yield [[ + 'transport' => [ + 'default' => 'rabbitmq_amqp', 'rabbitmq_amqp' => [ 'host' => getenv('SYMFONY__RABBITMQ__HOST'), 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), @@ -52,13 +70,13 @@ public function transportConfigDataProvider() 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), ], ], - ]; - - return [[$amqp, $rabbitmqAmqp]]; + ]]; } /** * @dataProvider transportConfigDataProvider + * + * @param mixed $config */ public function testProduceAndConsumeOneMessage($config) { @@ -84,6 +102,8 @@ public function testProduceAndConsumeOneMessage($config) /** * @dataProvider transportConfigDataProvider + * + * @param mixed $config */ public function testProduceAndRouteToTwoConsumes($config) { diff --git a/pkg/simple-client/composer.json b/pkg/simple-client/composer.json index d41d45568..58523d95c 100644 --- a/pkg/simple-client/composer.json +++ b/pkg/simple-client/composer.json @@ -19,7 +19,10 @@ }, "require-dev": { "phpunit/phpunit": "~5.5", - "enqueue/test": "^0.4" + "enqueue/test": "^0.4", + "enqueue/amqp-ext": "^0.4", + "enqueue/fs": "^0.4", + "enqueue/null": "^0.4" }, "autoload": { "psr-4": { "Enqueue\\SimpleClient\\": "" },