From 59eddeb0c7245804c152ac921ae810f885be7b0a Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 12 May 2017 22:31:51 +0300 Subject: [PATCH 1/2] Revert "Revert "[symfony] dsn transport factory."" This reverts commit e4306f5ee691c9911342a6434aa0c6aebe274469. --- docker-compose.yml | 2 + pkg/amqp-ext/AmqpConnectionFactory.php | 9 +- .../DependencyInjection/EnqueueExtension.php | 3 + .../Tests/Functional/UseCasesTest.php | 166 +++++++++-------- .../DependencyInjection/ConfigurationTest.php | 50 ++++-- .../EnqueueExtensionTest.php | 25 +++ pkg/enqueue/Symfony/DsnTransportFactory.php | 137 ++++++++++++++ .../Symfony/DefaultTransportFactoryTest.php | 23 +++ .../Tests/Symfony/DsnTransportFactoryTest.php | 169 ++++++++++++++++++ pkg/null/Symfony/NullTransportFactory.php | 10 ++ .../Symfony/NullTransportFactoryTest.php | 26 +++ 11 files changed, 529 insertions(+), 91 deletions(-) create mode 100644 pkg/enqueue/Symfony/DsnTransportFactory.php create mode 100644 pkg/enqueue/Tests/Symfony/DsnTransportFactoryTest.php diff --git a/docker-compose.yml b/docker-compose.yml index 8a55a7ea0..9aebd7543 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,8 @@ services: volumes: - './:/mqdev' environment: + - AMQP_DSN=amqp://rabbitmq + - RABBITMQ_DSN=rabbitmq_amqp://rabbitmq - SYMFONY__RABBITMQ__HOST=rabbitmq - SYMFONY__RABBITMQ__USER=guest - SYMFONY__RABBITMQ__PASSWORD=guest diff --git a/pkg/amqp-ext/AmqpConnectionFactory.php b/pkg/amqp-ext/AmqpConnectionFactory.php index e168cf796..05b4f6e39 100644 --- a/pkg/amqp-ext/AmqpConnectionFactory.php +++ b/pkg/amqp-ext/AmqpConnectionFactory.php @@ -68,10 +68,17 @@ public function createContext() return new AmqpContext(new \AMQPChannel($this->establishConnection())); } + /** + * @return \AMQPConnection + */ private function establishConnection() { if (false == $this->connection) { - $this->connection = new \AMQPConnection($this->config); + $config = $this->config; + $config['login'] = $this->config['user']; + $config['password'] = $this->config['pass']; + + $this->connection = new \AMQPConnection($config); $this->config['persisted'] ? $this->connection->pconnect() : $this->connection->connect(); } diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index 0ba9dcf2e..612987a33 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -4,6 +4,7 @@ use Enqueue\Client\TraceableProducer; use Enqueue\JobQueue\Job; +use Enqueue\Symfony\DsnTransportFactory; use Enqueue\Symfony\TransportFactoryInterface; use Symfony\Component\Config\FileLocator; use Symfony\Component\Config\Resource\FileResource; @@ -47,6 +48,8 @@ public function addTransportFactory(TransportFactoryInterface $transportFactory) */ public function load(array $configs, ContainerBuilder $container) { + $this->factories['dsn'] = new DsnTransportFactory($this->factories); + $config = $this->processConfiguration(new Configuration($this->factories), $configs); $loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php index 502752e53..1a23db552 100644 --- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php @@ -16,87 +16,105 @@ class UseCasesTest extends WebTestCase { public function provideEnqueueConfigs() { - return [ - ['amqp' => [ - 'transport' => [ - 'default' => 'amqp', - 'amqp' => [ - 'host' => getenv('SYMFONY__RABBITMQ__HOST'), - 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), - 'user' => getenv('SYMFONY__RABBITMQ__USER'), - 'pass' => getenv('SYMFONY__RABBITMQ__PASSWORD'), - 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), - 'lazy' => false, - ] + yield 'amqp' => [[ + 'transport' => [ + 'default' => 'amqp', + 'amqp' => [ + 'host' => getenv('SYMFONY__RABBITMQ__HOST'), + 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), + 'user' => getenv('SYMFONY__RABBITMQ__USER'), + 'pass' => getenv('SYMFONY__RABBITMQ__PASSWORD'), + 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), + 'lazy' => false, ] - ]], - ['stomp' => [ - 'transport' => [ - 'default' => 'stomp', - 'stomp' => [ - 'host' => getenv('SYMFONY__RABBITMQ__HOST'), - 'port' => getenv('SYMFONY__RABBITMQ__STOMP__PORT'), - 'login' => getenv('SYMFONY__RABBITMQ__USER'), - 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'), - 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), - 'lazy' => false, - ] + ] + ]]; + + yield 'amqp_dsn' => [[ + 'transport' => [ + 'default' => 'amqp', + 'amqp' => getenv('AMQP_DSN'), + ] + ]]; + + yield 'dsn_amqp' => [[ + 'transport' => [ + 'default' => 'dsn', + 'dsn' => getenv('AMQP_DSN'), + ] + ]]; + + yield 'stomp' => [[ + 'transport' => [ + 'default' => 'stomp', + 'stomp' => [ + 'host' => getenv('SYMFONY__RABBITMQ__HOST'), + 'port' => getenv('SYMFONY__RABBITMQ__STOMP__PORT'), + 'login' => getenv('SYMFONY__RABBITMQ__USER'), + 'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'), + 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), + 'lazy' => false, ] - ]], - ['predis' => [ - 'transport' => [ - 'default' => 'redis', - 'redis' => [ - 'host' => getenv('SYMFONY__REDIS__HOST'), - 'port' => (int) getenv('SYMFONY__REDIS__PORT'), - 'vendor' => 'predis', - 'lazy' => false, - ] + ] + ]]; + + yield 'predis' => [[ + 'transport' => [ + 'default' => 'redis', + 'redis' => [ + 'host' => getenv('SYMFONY__REDIS__HOST'), + 'port' => (int) getenv('SYMFONY__REDIS__PORT'), + 'vendor' => 'predis', + 'lazy' => false, ] - ]], - ['phpredis' => [ - 'transport' => [ - 'default' => 'redis', - 'redis' => [ - 'host' => getenv('SYMFONY__REDIS__HOST'), - 'port' => (int) getenv('SYMFONY__REDIS__PORT'), - 'vendor' => 'phpredis', - 'lazy' => false, - ] + ] + ]]; + + yield 'phpredis' => [[ + 'transport' => [ + 'default' => 'redis', + 'redis' => [ + 'host' => getenv('SYMFONY__REDIS__HOST'), + 'port' => (int) getenv('SYMFONY__REDIS__PORT'), + 'vendor' => 'phpredis', + 'lazy' => false, ] - ]], - ['fs' => [ - 'transport' => [ - 'default' => 'fs', - 'fs' => [ - 'store_dir' => sys_get_temp_dir(), - ] + ] + ]]; + + yield 'fs' => [[ + 'transport' => [ + 'default' => 'fs', + 'fs' => [ + 'store_dir' => sys_get_temp_dir(), ] - ]], - ['dbal' => [ - 'transport' => [ - 'default' => 'dbal', - 'dbal' => [ - 'dbname' => getenv('SYMFONY__DB__NAME'), - 'user' => getenv('SYMFONY__DB__USER'), - 'password' => getenv('SYMFONY__DB__PASSWORD'), - 'host' => getenv('SYMFONY__DB__HOST'), - 'port' => getenv('SYMFONY__DB__PORT'), - 'driver' => getenv('SYMFONY__DB__DRIVER'), - ] + ] + ]]; + + yield 'dbal' => [[ + 'transport' => [ + 'default' => 'dbal', + 'dbal' => [ + 'dbname' => getenv('SYMFONY__DB__NAME'), + 'user' => getenv('SYMFONY__DB__USER'), + 'password' => getenv('SYMFONY__DB__PASSWORD'), + 'host' => getenv('SYMFONY__DB__HOST'), + 'port' => getenv('SYMFONY__DB__PORT'), + 'driver' => getenv('SYMFONY__DB__DRIVER'), ] - ]], - ['sqs' => [ - 'transport' => [ - 'default' => 'sqs', - 'sqs' => [ - 'key' => getenv('AWS__SQS__KEY'), - 'secret' => getenv('AWS__SQS__SECRET'), - 'region' => getenv('AWS__SQS__REGION'), - ] + ] + ]]; + + yield 'sqs' => [[ + 'transport' => [ + 'default' => 'sqs', + 'sqs' => [ + 'key' => getenv('AWS__SQS__KEY'), + 'secret' => getenv('AWS__SQS__SECRET'), + 'region' => getenv('AWS__SQS__REGION'), ] - ]], - ]; + ] + ]]; } /** diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php index 0ae61b208..cf1f0c375 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php @@ -6,6 +6,7 @@ use Enqueue\Bundle\Tests\Unit\Mocks\FooTransportFactory; use Enqueue\Symfony\DefaultTransportFactory; use Enqueue\Null\Symfony\NullTransportFactory; +use Enqueue\Symfony\DsnTransportFactory; use Enqueue\Test\ClassExtensionTrait; use Symfony\Component\Config\Definition\ConfigurationInterface; use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; @@ -28,10 +29,8 @@ public function testCouldBeConstructedWithFactoriesAsFirstArgument() public function testThrowIfTransportNotConfigured() { - $this->setExpectedException( - InvalidConfigurationException::class, - 'The child node "transport" at path "enqueue" must be configured.' - ); + $this->expectException(InvalidConfigurationException::class); + $this->expectExceptionMessage('The child node "transport" at path "enqueue" must be configured.'); $configuration = new Configuration([]); @@ -59,10 +58,8 @@ public function testThrowExceptionIfFooTransportConfigInvalid() $processor = new Processor(); - $this->setExpectedException( - InvalidConfigurationException::class, - 'The path "enqueue.transport.foo.foo_param" cannot contain an empty value, but got null.' - ); + $this->expectException(InvalidConfigurationException::class); + $this->expectExceptionMessage('The path "enqueue.transport.foo.foo_param" cannot contain an empty value, but got null.'); $processor->processConfiguration($configuration, [[ 'transport' => [ @@ -103,6 +100,31 @@ public function testShouldAllowConfigureNullTransport() ], $config); } + public function testShouldAllowConfigureNullTransportViaDsnTransport() + { + $nullFactory = new NullTransportFactory(); + + $configuration = new Configuration([ + $nullFactory, + new DsnTransportFactory([$nullFactory]) + ]); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, [[ + 'transport' => [ + 'dsn' => 'null://', + ], + ]]); + + $this->assertArraySubset([ + 'transport' => [ + 'dsn' => [ + 'dsn' => 'null://' + ], + ], + ], $config); + } + public function testShouldAllowConfigureSeveralTransportsSameTime() { $configuration = new Configuration([ @@ -160,10 +182,8 @@ public function testShouldSetDefaultConfigurationForClient() public function testThrowExceptionIfRouterTopicIsEmpty() { - $this->setExpectedException( - InvalidConfigurationException::class, - 'The path "enqueue.client.router_topic" cannot contain an empty value, but got "".' - ); + $this->expectException(InvalidConfigurationException::class); + $this->expectExceptionMessage('The path "enqueue.client.router_topic" cannot contain an empty value, but got "".'); $configuration = new Configuration([new DefaultTransportFactory()]); @@ -180,10 +200,8 @@ public function testThrowExceptionIfRouterTopicIsEmpty() public function testThrowExceptionIfRouterQueueIsEmpty() { - $this->setExpectedException( - InvalidConfigurationException::class, - 'The path "enqueue.client.router_queue" cannot contain an empty value, but got "".' - ); + $this->expectException(InvalidConfigurationException::class); + $this->expectExceptionMessage('The path "enqueue.client.router_queue" cannot contain an empty value, but got "".'); $configuration = new Configuration([new DefaultTransportFactory()]); diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index 84210f390..49a5733a6 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -95,6 +95,31 @@ public function testShouldUseNullTransportAsDefault() ); } + public function testShouldUseNullTransportAsDefaultConfiguredViaDSN() + { + $container = new ContainerBuilder(); + + $extension = new EnqueueExtension(); + $extension->addTransportFactory(new NullTransportFactory()); + $extension->addTransportFactory(new DefaultTransportFactory()); + + $extension->load([[ + 'transport' => [ + 'default' => 'dsn', + 'dsn' => 'null://', + ], + ]], $container); + + self::assertEquals( + 'enqueue.transport.default.context', + (string) $container->getAlias('enqueue.transport.context') + ); + self::assertEquals( + 'enqueue.transport.dsn.context', + (string) $container->getAlias('enqueue.transport.default.context') + ); + } + public function testShouldConfigureFooTransport() { $container = new ContainerBuilder(); diff --git a/pkg/enqueue/Symfony/DsnTransportFactory.php b/pkg/enqueue/Symfony/DsnTransportFactory.php new file mode 100644 index 000000000..26b4b6aa6 --- /dev/null +++ b/pkg/enqueue/Symfony/DsnTransportFactory.php @@ -0,0 +1,137 @@ +name = $name; + + $this->factories = []; + foreach ($factories as $factory) { + $this->factories[$factory->getName()] = $factory; + } + } + + /** + * {@inheritdoc} + */ + public function addConfiguration(ArrayNodeDefinition $builder) + { + $builder + ->beforeNormalization() + ->ifString() + ->then(function ($v) { + return ['dsn' => $v]; + }) + ->end() + ->children() + ->scalarNode('dsn')->isRequired()->cannotBeEmpty()->end() + ; + } + + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factoryId = $this->findFactory($config['dsn'])->createConnectionFactory($container, [ + 'dsn' => $config['dsn'] + ]); + + $container->setAlias( + sprintf('enqueue.transport.%s.connection_factory', $this->getName()), + $factoryId + ); + + return $factoryId; + } + + /** + * {@inheritdoc} + */ + public function createContext(ContainerBuilder $container, array $config) + { + $contextId = $this->findFactory($config['dsn'])->createContext($container, [ + 'dsn' => $config['dsn'] + ]); + + $container->setAlias( + sprintf('enqueue.transport.%s.context', $this->getName()), + $contextId + ); + + return $contextId; + } + + /** + * {@inheritdoc} + */ + public function createDriver(ContainerBuilder $container, array $config) + { + $driverId = $this->findFactory($config['dsn'])->createDriver($container, [ + 'dsn' => $config['dsn'] + ]); + + $container->setAlias( + sprintf('enqueue.transport.%s.driver', $this->getName()), + $driverId + ); + + return $driverId; + } + + /** + * @return string + */ + public function getName() + { + return $this->name; + } + + /** + * @param string + * + * @return TransportFactoryInterface + */ + private function findFactory($dsn) + { + list($scheme) = explode('://', $dsn); + + if (false == $scheme || false === strpos($dsn, '://')) { + throw new \LogicException(sprintf('The scheme could not be parsed from DSN "%s"', $dsn)); + } + + $supportedSchemes = ['amqp', 'rabbitmq_amqp', 'null']; + if (false == in_array($scheme, $supportedSchemes)) { + throw new \LogicException(sprintf('The scheme "%s" is not supported.', $scheme)); + } + + if (false == array_key_exists($scheme, $this->factories)) { + throw new \LogicException(sprintf( + 'There is no factory that supports requested schema "%s", available are "%s"', + $scheme, + implode('", "', array_keys($this->factories)) + )); + } + + return $this->factories[$scheme]; + } +} diff --git a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php index 39cdca9c2..865ef6793 100644 --- a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php @@ -46,6 +46,29 @@ public function testShouldAllowAddConfiguration() $this->assertEquals(['alias' => 'the_alias'], $config); } + public function testShouldCreateConnectionFactory() + { + $container = new ContainerBuilder(); + + $transport = new DefaultTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, ['alias' => 'foo']); + + $this->assertEquals('enqueue.transport.default.connection_factory', $serviceId); + + $this->assertTrue($container->hasAlias('enqueue.transport.default.connection_factory')); + $this->assertEquals( + 'enqueue.transport.foo.connection_factory', + (string) $container->getAlias('enqueue.transport.default.connection_factory') + ); + + $this->assertTrue($container->hasAlias('enqueue.transport.connection_factory')); + $this->assertEquals( + 'enqueue.transport.default.connection_factory', + (string) $container->getAlias('enqueue.transport.connection_factory') + ); + } + public function testShouldCreateContext() { $container = new ContainerBuilder(); diff --git a/pkg/enqueue/Tests/Symfony/DsnTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DsnTransportFactoryTest.php new file mode 100644 index 000000000..e2009f76e --- /dev/null +++ b/pkg/enqueue/Tests/Symfony/DsnTransportFactoryTest.php @@ -0,0 +1,169 @@ +assertClassImplements(TransportFactoryInterface::class, DsnTransportFactory::class); + } + + public function testCouldBeConstructedWithDefaultName() + { + $transport = new DsnTransportFactory([]); + + $this->assertEquals('dsn', $transport->getName()); + } + + public function testCouldBeConstructedWithCustomName() + { + $transport = new DsnTransportFactory([], 'theCustomName'); + + $this->assertEquals('theCustomName', $transport->getName()); + } + + public function testShouldAllowAddConfigurationAsString() + { + $transport = new DsnTransportFactory([]); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), ['amqp://example.com']); + + $this->assertEquals(['dsn' => 'amqp://example.com'], $config); + } + + public function testShouldAllowAddConfigurationAsOption() + { + $transport = new DsnTransportFactory([]); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [['dsn' => 'amqp://example.com']]); + + $this->assertEquals(['dsn' => 'amqp://example.com'], $config); + } + + public function testThrowIfSchemeNotParsedOnCreateConnectionFactory() + { + $transport = new DsnTransportFactory([]); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The scheme could not be parsed from DSN "invalidDsn"'); + + $transport->createConnectionFactory(new ContainerBuilder(), ['dsn' => 'invalidDsn']); + } + + public function testThrowIfSchemeNotSupportedOnCreateConnectionFactory() + { + $transport = new DsnTransportFactory([]); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The scheme "http" is not supported.'); + + $transport->createConnectionFactory(new ContainerBuilder(), ['dsn' => 'http://foo.bar']); + } + + public function testThrowIfThereIsFactoryRegistered() + { + $transport = new DsnTransportFactory([ + $this->createTransportFactoryStub('foo'), + $this->createTransportFactoryStub('bar'), + ]); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('There is no factory that supports requested schema "amqp", available are "foo", "bar"'); + + $transport->createConnectionFactory(new ContainerBuilder(), ['dsn' => 'amqp://foo']); + } + + public function testShouldProxyCallToInternalFactoryCreateConnectionFactoryMethod() + { + $container = new ContainerBuilder(); + + $internalFactory = $this->createTransportFactoryStub('amqp'); + $internalFactory + ->expects($this->once()) + ->method('createConnectionFactory') + ->with($this->identicalTo($container), ['dsn' => 'amqp://example.com']) + ->willReturn('theServiceId') + ; + + $transport = new DsnTransportFactory([$internalFactory]); + + $serviceId = $transport->createConnectionFactory($container, ['dsn' => 'amqp://example.com']); + + $this->assertEquals('theServiceId', $serviceId); + } + + public function testShouldProxyCallToInternalCreateContextMethod() + { + $container = new ContainerBuilder(); + + $internalFactory = $this->createTransportFactoryStub('amqp'); + $internalFactory + ->expects($this->once()) + ->method('createContext') + ->with($this->identicalTo($container), ['dsn' => 'amqp://example.com']) + ->willReturn('theServiceId') + ; + + $transport = new DsnTransportFactory([$internalFactory]); + + $serviceId = $transport->createContext($container, ['dsn' => 'amqp://example.com']); + + $this->assertEquals('theServiceId', $serviceId); + } + + public function testShouldProxyCallToInternalCreateDriverMethod() + { + $container = new ContainerBuilder(); + + $internalFactory = $this->createTransportFactoryStub('amqp'); + $internalFactory + ->expects($this->once()) + ->method('createDriver') + ->with($this->identicalTo($container), ['dsn' => 'amqp://example.com']) + ->willReturn('theServiceId') + ; + + $transport = new DsnTransportFactory([$internalFactory]); + + $serviceId = $transport->createDriver($container, ['dsn' => 'amqp://example.com']); + + $this->assertEquals('theServiceId', $serviceId); + } + + /** + * @param mixed $name + * + * @return \PHPUnit_Framework_MockObject_MockObject|TransportFactoryInterface + */ + private function createTransportFactoryStub($name) + { + $factoryMock = $this->createMock(TransportFactoryInterface::class); + $factoryMock + ->expects($this->any()) + ->method('getName') + ->willReturn($name) + ; + + return $factoryMock; + } +} diff --git a/pkg/null/Symfony/NullTransportFactory.php b/pkg/null/Symfony/NullTransportFactory.php index b18d38474..863489b0e 100644 --- a/pkg/null/Symfony/NullTransportFactory.php +++ b/pkg/null/Symfony/NullTransportFactory.php @@ -31,6 +31,16 @@ public function __construct($name = 'null') */ public function addConfiguration(ArrayNodeDefinition $builder) { + $builder + ->beforeNormalization() + ->ifString()->then(function ($v) { + return ['dsn' => $v]; + }) + ->end() + ->children() + ->scalarNode('dsn')->end() + ->end() + ; } /** diff --git a/pkg/null/Tests/Symfony/NullTransportFactoryTest.php b/pkg/null/Tests/Symfony/NullTransportFactoryTest.php index 07ee32973..96c2ddf0d 100644 --- a/pkg/null/Tests/Symfony/NullTransportFactoryTest.php +++ b/pkg/null/Tests/Symfony/NullTransportFactoryTest.php @@ -50,6 +50,32 @@ public function testShouldAllowAddConfiguration() $this->assertEquals([], $config); } + public function testShouldAllowAddConfigurationWithDsnString() + { + $transport = new NullTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), ['aStringDSN']); + + $this->assertEquals(['dsn' => 'aStringDSN'], $config); + } + + public function testShouldAllowAddConfigurationWithDsnStringOption() + { + $transport = new NullTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [['dsn' => 'aStringDSN']]); + + $this->assertEquals(['dsn' => 'aStringDSN'], $config); + } + public function testShouldCreateConnectionFactory() { $container = new ContainerBuilder(); From 553974a0ff155a11f8176d7c43aec9e02119af5c Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 15 May 2017 13:14:07 +0300 Subject: [PATCH 2/2] add ability to guess a factory by dsn scheme to default transport factory. --- docker-compose.yml | 2 +- docs/bundle/quick_tour.md | 3 +- docs/client/quick_tour.md | 10 +- docs/client/rpc_call.md | 17 +- docs/quick_tour.md | 18 +- .../DependencyInjection/EnqueueExtension.php | 3 - .../Tests/Functional/UseCasesTest.php | 12 + .../DependencyInjection/ConfigurationTest.php | 26 --- .../EnqueueExtensionTest.php | 13 +- .../Symfony/DefaultTransportFactory.php | 74 +++++- pkg/enqueue/Symfony/DsnTransportFactory.php | 137 ----------- .../Symfony/DefaultTransportFactoryTest.php | 182 ++++++++++++++- .../Tests/Symfony/DsnTransportFactoryTest.php | 169 -------------- pkg/simple-client/SimpleClient.php | 216 ++++++++++-------- .../SimpleClientContainerExtension.php | 82 ++++--- .../Tests/Functional/SimpleClientTest.php | 34 ++- pkg/simple-client/composer.json | 5 +- 17 files changed, 478 insertions(+), 525 deletions(-) delete mode 100644 pkg/enqueue/Symfony/DsnTransportFactory.php delete mode 100644 pkg/enqueue/Tests/Symfony/DsnTransportFactoryTest.php diff --git a/docker-compose.yml b/docker-compose.yml index 459ac5c46..542ed21af 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,7 +10,7 @@ services: volumes: - './:/mqdev' environment: - - AMQP_DSN=amqp://rabbitmq + - AMQP_DSN=amqp://guest:guest@rabbitmq:5672/mqdev - SYMFONY__RABBITMQ__HOST=rabbitmq - SYMFONY__RABBITMQ__USER=guest - SYMFONY__RABBITMQ__PASSWORD=guest diff --git a/docs/bundle/quick_tour.md b/docs/bundle/quick_tour.md index de405408e..e4513c0cd 100644 --- a/docs/bundle/quick_tour.md +++ b/docs/bundle/quick_tour.md @@ -45,8 +45,7 @@ First, you have to configure a transport layer and set one to be default. enqueue: transport: - default: 'amqp' - amqp: "amqp://" + default: "amqp://" client: ~ ``` diff --git a/docs/client/quick_tour.md b/docs/client/quick_tour.md index 3cca27de2..3d751a7ce 100644 --- a/docs/client/quick_tour.md +++ b/docs/client/quick_tour.md @@ -22,15 +22,7 @@ use Enqueue\SimpleClient\SimpleClient; include __DIR__.'/vendor/autoload.php'; -$client = new SimpleClient([ - 'transport' => [ - 'default' => 'amqp', - 'amqp' => 'amqp://' - ], - 'client' => [ - 'app_name' => 'plain_php', - ], -]); +$client = new SimpleClient('amqp://'); ``` ## Produce message diff --git a/docs/client/rpc_call.md b/docs/client/rpc_call.md index cccd4f602..b8fc13698 100644 --- a/docs/client/rpc_call.md +++ b/docs/client/rpc_call.md @@ -1,18 +1,23 @@ # Client. RPC call The client's [quick tour](quick_tour.md) describes how to get the client object. -We use you followed instructions there and have instance of `Enqueue\SimpleClient\SimpleClient` in `$client` var. +Here we'll use `Enqueue\SimpleClient\SimpleClient` though it is not required. +You can get all that stuff from manually built client or get objects from a container (Symfony). + +The simple client could be created like this: ## The client side There is a handy class RpcClient shipped with the client component. -It allows you to easily send a message and wait for a reply. +It allows you to easily perform [RPC calls](https://en.wikipedia.org/wiki/Remote_procedure_call). +It send a message and wait for a reply. ```php getProducer(), $context); @@ -24,8 +29,9 @@ You can perform several requests asynchronously with `callAsync` and request rep ```php getProducer(), $context); @@ -54,10 +60,11 @@ use Enqueue\Psr\PsrContext; use Enqueue\Consumption\Result; use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\ReplyExtension; +use Enqueue\SimpleClient\SimpleClient; /** @var \Enqueue\Psr\PsrContext $context */ -/** @var \Enqueue\SimpleClient\SimpleClient $client */ +$client = new SimpleClient('amqp://'); $client->bind('greeting_topic', 'greeting_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) { echo $message->getBody(); diff --git a/docs/quick_tour.md b/docs/quick_tour.md index 1c32a73ef..3d334d3a2 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -170,19 +170,11 @@ Here's an example of how you can send and consume messages. use Enqueue\SimpleClient\SimpleClient; use Enqueue\Psr\PsrMessage; -$client = new SimpleClient([ - 'transport' => [ - 'default' => 'amqp', - 'amqp' => [ - 'host' => 'localhost', - 'port' => 5672, - 'vhost' => '/', - 'user' => 'guest', - 'pass' => 'guest', - ], - ], - 'client' => true, -]); +// composer require enqueue/amqp-ext +$client = new SimpleClient('amqp://'); + +// composer require enqueue/fs +$client = new SimpleClient('file://foo/bar'); $client->setupBroker(); diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index 612987a33..0ba9dcf2e 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -4,7 +4,6 @@ use Enqueue\Client\TraceableProducer; use Enqueue\JobQueue\Job; -use Enqueue\Symfony\DsnTransportFactory; use Enqueue\Symfony\TransportFactoryInterface; use Symfony\Component\Config\FileLocator; use Symfony\Component\Config\Resource\FileResource; @@ -48,8 +47,6 @@ public function addTransportFactory(TransportFactoryInterface $transportFactory) */ public function load(array $configs, ContainerBuilder $container) { - $this->factories['dsn'] = new DsnTransportFactory($this->factories); - $config = $this->processConfiguration(new Configuration($this->factories), $configs); $loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php index 47e015a5d..1679793e2 100644 --- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php @@ -43,6 +43,12 @@ public function provideEnqueueConfigs() ], ]]; + yield 'default_amqp_as_dsn' => [[ + 'transport' => [ + 'default' => getenv('AMQP_DSN'), + ], + ]]; + yield 'stomp' => [[ 'transport' => [ 'default' => 'stomp', @@ -97,6 +103,12 @@ public function provideEnqueueConfigs() ], ]]; + yield 'default_fs_as_dsn' => [[ + 'transport' => [ + 'default' => 'file:/'.sys_get_temp_dir(), + ], + ]]; + yield 'dbal' => [[ 'transport' => [ 'default' => 'dbal', diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php index 6f79dc77f..61a1999c7 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php @@ -5,7 +5,6 @@ use Enqueue\Bundle\DependencyInjection\Configuration; use Enqueue\Bundle\Tests\Unit\Mocks\FooTransportFactory; use Enqueue\Null\Symfony\NullTransportFactory; -use Enqueue\Symfony\DsnTransportFactory; use Enqueue\Symfony\DefaultTransportFactory; use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; @@ -100,31 +99,6 @@ public function testShouldAllowConfigureNullTransport() ], $config); } - public function testShouldAllowConfigureNullTransportViaDsnTransport() - { - $nullFactory = new NullTransportFactory(); - - $configuration = new Configuration([ - $nullFactory, - new DsnTransportFactory([$nullFactory]) - ]); - - $processor = new Processor(); - $config = $processor->processConfiguration($configuration, [[ - 'transport' => [ - 'dsn' => 'null://', - ], - ]]); - - $this->assertArraySubset([ - 'transport' => [ - 'dsn' => [ - 'dsn' => 'null://' - ], - ], - ], $config); - } - public function testShouldAllowConfigureSeveralTransportsSameTime() { $configuration = new Configuration([ diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index 49a5733a6..a503c9ae6 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -7,14 +7,14 @@ use Enqueue\Bundle\Tests\Unit\Mocks\FooTransportFactory; use Enqueue\Client\Producer; use Enqueue\Client\TraceableProducer; -use Enqueue\Symfony\DefaultTransportFactory; +use Enqueue\Null\NullContext; use Enqueue\Null\Symfony\NullTransportFactory; +use Enqueue\Symfony\DefaultTransportFactory; use Enqueue\Test\ClassExtensionTrait; -use Enqueue\Null\NullContext; +use PHPUnit\Framework\TestCase; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Reference; use Symfony\Component\HttpKernel\DependencyInjection\Extension; -use PHPUnit\Framework\TestCase; class EnqueueExtensionTest extends TestCase { @@ -105,8 +105,7 @@ public function testShouldUseNullTransportAsDefaultConfiguredViaDSN() $extension->load([[ 'transport' => [ - 'default' => 'dsn', - 'dsn' => 'null://', + 'default' => 'null://', ], ]], $container); @@ -115,7 +114,7 @@ public function testShouldUseNullTransportAsDefaultConfiguredViaDSN() (string) $container->getAlias('enqueue.transport.context') ); self::assertEquals( - 'enqueue.transport.dsn.context', + 'enqueue.transport.default_null.context', (string) $container->getAlias('enqueue.transport.default.context') ); } @@ -470,7 +469,7 @@ public function testShouldAddJobQueueEntityMapping() $extension->prepend($container); - $config = $container->getExtensionConfig('doctrine'); + $config = $container->getExtensionConfig('doctrine'); $this->assertSame(['dbal' => true], $config[1]); $this->assertNotEmpty($config[0]['orm']['mappings']['enqueue_job_queue']); diff --git a/pkg/enqueue/Symfony/DefaultTransportFactory.php b/pkg/enqueue/Symfony/DefaultTransportFactory.php index a7d9e831e..6d49560cf 100644 --- a/pkg/enqueue/Symfony/DefaultTransportFactory.php +++ b/pkg/enqueue/Symfony/DefaultTransportFactory.php @@ -2,8 +2,15 @@ namespace Enqueue\Symfony; +use Enqueue\AmqpExt\AmqpConnectionFactory; +use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; +use Enqueue\Fs\FsConnectionFactory; +use Enqueue\Fs\Symfony\FsTransportFactory; +use Enqueue\Null\NullConnectionFactory; +use Enqueue\Null\Symfony\NullTransportFactory; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\DependencyInjection\ContainerBuilder; +use function Enqueue\dsn_to_connection_factory; class DefaultTransportFactory implements TransportFactoryInterface { @@ -29,18 +36,30 @@ public function addConfiguration(ArrayNodeDefinition $builder) ->beforeNormalization() ->ifString() ->then(function ($v) { - return ['alias' => $v]; + if (false === strpos($v, '://')) { + return ['alias' => $v]; + } + + return ['dsn' => $v]; }) ->end() ->children() - ->scalarNode('alias')->isRequired()->cannotBeEmpty()->end() - ; + ->scalarNode('alias')->cannotBeEmpty()->end() + ->scalarNode('dsn')->cannotBeEmpty()->end() + ; } public function createConnectionFactory(ContainerBuilder $container, array $config) { + if (isset($config['alias'])) { + $aliasId = sprintf('enqueue.transport.%s.connection_factory', $config['alias']); + } elseif (isset($config['dsn'])) { + $aliasId = $this->findFactory($config['dsn'])->createConnectionFactory($container, $config); + } else { + throw new \LogicException('Either dsn or alias option must be set.'); + } + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); - $aliasId = sprintf('enqueue.transport.%s.connection_factory', $config['alias']); $container->setAlias($factoryId, $aliasId); $container->setAlias('enqueue.transport.connection_factory', $factoryId); @@ -53,8 +72,15 @@ public function createConnectionFactory(ContainerBuilder $container, array $conf */ public function createContext(ContainerBuilder $container, array $config) { + if (isset($config['alias'])) { + $aliasId = sprintf('enqueue.transport.%s.context', $config['alias']); + } elseif (isset($config['dsn'])) { + $aliasId = $this->findFactory($config['dsn'])->createContext($container, $config); + } else { + throw new \LogicException('Either dsn or alias option must be set.'); + } + $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); - $aliasId = sprintf('enqueue.transport.%s.context', $config['alias']); $container->setAlias($contextId, $aliasId); $container->setAlias('enqueue.transport.context', $contextId); @@ -67,8 +93,15 @@ public function createContext(ContainerBuilder $container, array $config) */ public function createDriver(ContainerBuilder $container, array $config) { + if (isset($config['alias'])) { + $aliasId = sprintf('enqueue.client.%s.driver', $config['alias']); + } elseif (isset($config['dsn'])) { + $aliasId = $this->findFactory($config['dsn'])->createDriver($container, $config); + } else { + throw new \LogicException('Either dsn or alias option must be set.'); + } + $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); - $aliasId = sprintf('enqueue.client.%s.driver', $config['alias']); $container->setAlias($driverId, $aliasId); $container->setAlias('enqueue.client.driver', $driverId); @@ -83,4 +116,33 @@ public function getName() { return $this->name; } + + /** + * @param string + * @param mixed $dsn + * + * @return TransportFactoryInterface + */ + private function findFactory($dsn) + { + $connectionFactory = dsn_to_connection_factory($dsn); + + if ($connectionFactory instanceof AmqpConnectionFactory) { + return new AmqpTransportFactory('default_amqp'); + } + + if ($connectionFactory instanceof FsConnectionFactory) { + return new FsTransportFactory('default_fs'); + } + + if ($connectionFactory instanceof NullConnectionFactory) { + return new NullTransportFactory('default_null'); + } + + throw new \LogicException(sprintf( + 'There is no supported transport factory for the connection factory "%s" created from DSN "%s"', + get_class($connectionFactory), + $dsn + )); + } } diff --git a/pkg/enqueue/Symfony/DsnTransportFactory.php b/pkg/enqueue/Symfony/DsnTransportFactory.php deleted file mode 100644 index 26b4b6aa6..000000000 --- a/pkg/enqueue/Symfony/DsnTransportFactory.php +++ /dev/null @@ -1,137 +0,0 @@ -name = $name; - - $this->factories = []; - foreach ($factories as $factory) { - $this->factories[$factory->getName()] = $factory; - } - } - - /** - * {@inheritdoc} - */ - public function addConfiguration(ArrayNodeDefinition $builder) - { - $builder - ->beforeNormalization() - ->ifString() - ->then(function ($v) { - return ['dsn' => $v]; - }) - ->end() - ->children() - ->scalarNode('dsn')->isRequired()->cannotBeEmpty()->end() - ; - } - - public function createConnectionFactory(ContainerBuilder $container, array $config) - { - $factoryId = $this->findFactory($config['dsn'])->createConnectionFactory($container, [ - 'dsn' => $config['dsn'] - ]); - - $container->setAlias( - sprintf('enqueue.transport.%s.connection_factory', $this->getName()), - $factoryId - ); - - return $factoryId; - } - - /** - * {@inheritdoc} - */ - public function createContext(ContainerBuilder $container, array $config) - { - $contextId = $this->findFactory($config['dsn'])->createContext($container, [ - 'dsn' => $config['dsn'] - ]); - - $container->setAlias( - sprintf('enqueue.transport.%s.context', $this->getName()), - $contextId - ); - - return $contextId; - } - - /** - * {@inheritdoc} - */ - public function createDriver(ContainerBuilder $container, array $config) - { - $driverId = $this->findFactory($config['dsn'])->createDriver($container, [ - 'dsn' => $config['dsn'] - ]); - - $container->setAlias( - sprintf('enqueue.transport.%s.driver', $this->getName()), - $driverId - ); - - return $driverId; - } - - /** - * @return string - */ - public function getName() - { - return $this->name; - } - - /** - * @param string - * - * @return TransportFactoryInterface - */ - private function findFactory($dsn) - { - list($scheme) = explode('://', $dsn); - - if (false == $scheme || false === strpos($dsn, '://')) { - throw new \LogicException(sprintf('The scheme could not be parsed from DSN "%s"', $dsn)); - } - - $supportedSchemes = ['amqp', 'rabbitmq_amqp', 'null']; - if (false == in_array($scheme, $supportedSchemes)) { - throw new \LogicException(sprintf('The scheme "%s" is not supported.', $scheme)); - } - - if (false == array_key_exists($scheme, $this->factories)) { - throw new \LogicException(sprintf( - 'There is no factory that supports requested schema "%s", available are "%s"', - $scheme, - implode('", "', array_keys($this->factories)) - )); - } - - return $this->factories[$scheme]; - } -} diff --git a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php index 610727e37..bc73746f0 100644 --- a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php @@ -33,7 +33,7 @@ public function testCouldBeConstructedWithCustomName() $this->assertEquals('theCustomName', $transport->getName()); } - public function testShouldAllowAddConfiguration() + public function testShouldAllowAddConfigurationAsAliasAsString() { $transport = new DefaultTransportFactory(); $tb = new TreeBuilder(); @@ -46,7 +46,87 @@ public function testShouldAllowAddConfiguration() $this->assertEquals(['alias' => 'the_alias'], $config); } - public function testShouldCreateConnectionFactory() + public function testShouldAllowAddConfigurationAsAliasAsOption() + { + $transport = new DefaultTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [['alias' => 'the_alias']]); + + $this->assertEquals(['alias' => 'the_alias'], $config); + } + + public function testShouldAllowAddConfigurationAsDsn() + { + $transport = new DefaultTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), ['dsn://']); + + $this->assertEquals(['dsn' => 'dsn://'], $config); + } + + public function testThrowIfNeitherDsnNorAliasConfiguredOnCreateConnectionFactory() + { + $transport = new DefaultTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[]]); + + // guard + $this->assertEquals([], $config); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Either dsn or alias option must be set'); + $transport->createConnectionFactory(new ContainerBuilder(), $config); + } + + public function testThrowIfNeitherDsnNorAliasConfiguredOnCreateContext() + { + $transport = new DefaultTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[]]); + + // guard + $this->assertEquals([], $config); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Either dsn or alias option must be set'); + $transport->createContext(new ContainerBuilder(), $config); + } + + public function testThrowIfNeitherDsnNorAliasConfiguredOnCreateDriver() + { + $transport = new DefaultTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[]]); + + // guard + $this->assertEquals([], $config); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Either dsn or alias option must be set'); + $transport->createDriver(new ContainerBuilder(), $config); + } + + public function testShouldCreateConnectionFactoryFromAlias() { $container = new ContainerBuilder(); @@ -69,7 +149,7 @@ public function testShouldCreateConnectionFactory() ); } - public function testShouldCreateContext() + public function testShouldCreateContextFromAlias() { $container = new ContainerBuilder(); @@ -88,7 +168,7 @@ public function testShouldCreateContext() $this->assertEquals($serviceId, (string) $context); } - public function testShouldCreateDriver() + public function testShouldCreateDriverFromAlias() { $container = new ContainerBuilder(); @@ -106,4 +186,98 @@ public function testShouldCreateDriver() $context = $container->getAlias('enqueue.client.driver'); $this->assertEquals($driverId, (string) $context); } + + /** + * @dataProvider provideDSNs + * + * @param mixed $dsn + * @param mixed $expectedName + */ + public function testShouldCreateConnectionFactoryFromDSN($dsn, $expectedName) + { + $container = new ContainerBuilder(); + + $transport = new DefaultTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, ['dsn' => $dsn]); + + $this->assertEquals('enqueue.transport.default.connection_factory', $serviceId); + + $this->assertTrue($container->hasAlias('enqueue.transport.default.connection_factory')); + $this->assertEquals( + sprintf('enqueue.transport.%s.connection_factory', $expectedName), + (string) $container->getAlias('enqueue.transport.default.connection_factory') + ); + + $this->assertTrue($container->hasAlias('enqueue.transport.connection_factory')); + $this->assertEquals( + 'enqueue.transport.default.connection_factory', + (string) $container->getAlias('enqueue.transport.connection_factory') + ); + } + + /** + * @dataProvider provideDSNs + * + * @param mixed $dsn + * @param mixed $expectedName + */ + public function testShouldCreateContextFromDsn($dsn, $expectedName) + { + $container = new ContainerBuilder(); + + $transport = new DefaultTransportFactory(); + + $serviceId = $transport->createContext($container, ['dsn' => $dsn]); + + $this->assertEquals('enqueue.transport.default.context', $serviceId); + + $this->assertTrue($container->hasAlias($serviceId)); + $context = $container->getAlias($serviceId); + $this->assertEquals( + sprintf('enqueue.transport.%s.context', $expectedName), + (string) $context + ); + + $this->assertTrue($container->hasAlias('enqueue.transport.context')); + $context = $container->getAlias('enqueue.transport.context'); + $this->assertEquals($serviceId, (string) $context); + } + + /** + * @dataProvider provideDSNs + * + * @param mixed $dsn + * @param mixed $expectedName + */ + public function testShouldCreateDriverFromDsn($dsn, $expectedName) + { + $container = new ContainerBuilder(); + + $transport = new DefaultTransportFactory(); + + $driverId = $transport->createDriver($container, ['dsn' => $dsn]); + + $this->assertEquals('enqueue.client.default.driver', $driverId); + + $this->assertTrue($container->hasAlias($driverId)); + $context = $container->getAlias($driverId); + $this->assertEquals( + sprintf('enqueue.client.%s.driver', $expectedName), + (string) $context + ); + + $this->assertTrue($container->hasAlias('enqueue.client.driver')); + $context = $container->getAlias('enqueue.client.driver'); + $this->assertEquals($driverId, (string) $context); + } + + public static function provideDSNs() + { + yield ['amqp://', 'default_amqp']; + + yield ['null://', 'default_null']; + + yield ['file://', 'default_fs']; + } } diff --git a/pkg/enqueue/Tests/Symfony/DsnTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DsnTransportFactoryTest.php deleted file mode 100644 index e2009f76e..000000000 --- a/pkg/enqueue/Tests/Symfony/DsnTransportFactoryTest.php +++ /dev/null @@ -1,169 +0,0 @@ -assertClassImplements(TransportFactoryInterface::class, DsnTransportFactory::class); - } - - public function testCouldBeConstructedWithDefaultName() - { - $transport = new DsnTransportFactory([]); - - $this->assertEquals('dsn', $transport->getName()); - } - - public function testCouldBeConstructedWithCustomName() - { - $transport = new DsnTransportFactory([], 'theCustomName'); - - $this->assertEquals('theCustomName', $transport->getName()); - } - - public function testShouldAllowAddConfigurationAsString() - { - $transport = new DsnTransportFactory([]); - $tb = new TreeBuilder(); - $rootNode = $tb->root('foo'); - - $transport->addConfiguration($rootNode); - $processor = new Processor(); - $config = $processor->process($tb->buildTree(), ['amqp://example.com']); - - $this->assertEquals(['dsn' => 'amqp://example.com'], $config); - } - - public function testShouldAllowAddConfigurationAsOption() - { - $transport = new DsnTransportFactory([]); - $tb = new TreeBuilder(); - $rootNode = $tb->root('foo'); - - $transport->addConfiguration($rootNode); - $processor = new Processor(); - $config = $processor->process($tb->buildTree(), [['dsn' => 'amqp://example.com']]); - - $this->assertEquals(['dsn' => 'amqp://example.com'], $config); - } - - public function testThrowIfSchemeNotParsedOnCreateConnectionFactory() - { - $transport = new DsnTransportFactory([]); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The scheme could not be parsed from DSN "invalidDsn"'); - - $transport->createConnectionFactory(new ContainerBuilder(), ['dsn' => 'invalidDsn']); - } - - public function testThrowIfSchemeNotSupportedOnCreateConnectionFactory() - { - $transport = new DsnTransportFactory([]); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The scheme "http" is not supported.'); - - $transport->createConnectionFactory(new ContainerBuilder(), ['dsn' => 'http://foo.bar']); - } - - public function testThrowIfThereIsFactoryRegistered() - { - $transport = new DsnTransportFactory([ - $this->createTransportFactoryStub('foo'), - $this->createTransportFactoryStub('bar'), - ]); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('There is no factory that supports requested schema "amqp", available are "foo", "bar"'); - - $transport->createConnectionFactory(new ContainerBuilder(), ['dsn' => 'amqp://foo']); - } - - public function testShouldProxyCallToInternalFactoryCreateConnectionFactoryMethod() - { - $container = new ContainerBuilder(); - - $internalFactory = $this->createTransportFactoryStub('amqp'); - $internalFactory - ->expects($this->once()) - ->method('createConnectionFactory') - ->with($this->identicalTo($container), ['dsn' => 'amqp://example.com']) - ->willReturn('theServiceId') - ; - - $transport = new DsnTransportFactory([$internalFactory]); - - $serviceId = $transport->createConnectionFactory($container, ['dsn' => 'amqp://example.com']); - - $this->assertEquals('theServiceId', $serviceId); - } - - public function testShouldProxyCallToInternalCreateContextMethod() - { - $container = new ContainerBuilder(); - - $internalFactory = $this->createTransportFactoryStub('amqp'); - $internalFactory - ->expects($this->once()) - ->method('createContext') - ->with($this->identicalTo($container), ['dsn' => 'amqp://example.com']) - ->willReturn('theServiceId') - ; - - $transport = new DsnTransportFactory([$internalFactory]); - - $serviceId = $transport->createContext($container, ['dsn' => 'amqp://example.com']); - - $this->assertEquals('theServiceId', $serviceId); - } - - public function testShouldProxyCallToInternalCreateDriverMethod() - { - $container = new ContainerBuilder(); - - $internalFactory = $this->createTransportFactoryStub('amqp'); - $internalFactory - ->expects($this->once()) - ->method('createDriver') - ->with($this->identicalTo($container), ['dsn' => 'amqp://example.com']) - ->willReturn('theServiceId') - ; - - $transport = new DsnTransportFactory([$internalFactory]); - - $serviceId = $transport->createDriver($container, ['dsn' => 'amqp://example.com']); - - $this->assertEquals('theServiceId', $serviceId); - } - - /** - * @param mixed $name - * - * @return \PHPUnit_Framework_MockObject_MockObject|TransportFactoryInterface - */ - private function createTransportFactoryStub($name) - { - $factoryMock = $this->createMock(TransportFactoryInterface::class); - $factoryMock - ->expects($this->any()) - ->method('getName') - ->willReturn($name) - ; - - return $factoryMock; - } -} diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index 5879b969d..a65b719ee 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -1,4 +1,5 @@ [ + * 'default' => 'amqp', + * 'amqp' => [], // amqp options here + * ], + * ] + * + * or a with all details: + * * $config = [ * 'transport' => [ * 'default' => 'amqp', @@ -59,102 +78,6 @@ public function __construct($config) $this->container = $this->buildContainer($config); } - /** - * @param array|string $config - * - * @return ContainerBuilder - */ - private function buildContainer($config) - { - $config = $this->buildConfig($config); - $extension = $this->buildContainerExtension($config); - - $container = new ContainerBuilder(); - $container->registerExtension($extension); - $container->loadFromExtension($extension->getAlias(), $config); - - $container->compile(); - - return $container; - } - - /** - * @param array $config - * - * @return SimpleClientContainerExtension - */ - private function buildContainerExtension($config) - { - $map = [ - 'default' => DefaultTransportFactory::class, - 'amqp' => AmqpTransportFactory::class, - 'rabbitmq_amqp' => RabbitMqAmqpTransportFactory::class, - 'dbal' => DbalTransportFactory::class, - 'fs' => FsTransportFactory::class, - 'redis' => RedisTransportFactory::class, - 'stomp' => StompTransportFactory::class, - 'rabbitmq_stomp' => RabbitMqStompTransportFactory::class, - 'sqs' => SqsTransportFactory::class, - ]; - - $extension = new SimpleClientContainerExtension(); - - foreach (array_keys($config['transport']) as $transport) { - if (false == isset($map[$transport])) { - throw new \LogicException(sprintf('Transport is not supported: "%s"', $transport)); - } - - $extension->addTransportFactory(new $map[$transport]); - } - - return $extension; - } - - /** - * @param array|string $config - * - * @return array - */ - private function buildConfig($config) - { - if (is_string($config)) { - $extConfig = [ - 'client' => [], - 'transport' => [ - 'default' => $config, - $config => [], - ], - ]; - } elseif (is_array($config)) { - $extConfig = array_merge_recursive([ - 'client' => [], - 'transport' => [], - ], $config); - } else { - throw new \LogicException('Expects config is string or array'); - } - - if (empty($extConfig['transport']['default'])) { - $defaultTransport = null; - foreach ($extConfig['transport'] as $transport => $config) { - if ('default' === $transport) { - continue; - } - - $defaultTransport = $transport; - break; - } - - if (false == $defaultTransport) { - throw new \LogicException('There is no transport configured'); - } - - $extConfig['transport']['default'] = $defaultTransport; - } - - return $extConfig; - } - /** * @param string $topic * @param string $processorName @@ -207,7 +130,7 @@ public function consume(ExtensionInterface $runtimeExtension = null) */ public function getContext() { - return $this->container->get('enqueue.transport.context'); + return $this->container->get('enqueue.transport.context'); } /** @@ -290,4 +213,103 @@ public function getRouterProcessor() { return $this->container->get('enqueue.client.router_processor'); } + + /** + * @param array|string $config + * + * @return ContainerBuilder + */ + private function buildContainer($config) + { + $config = $this->buildConfig($config); + $extension = $this->buildContainerExtension(); + + $container = new ContainerBuilder(); + $container->registerExtension($extension); + $container->loadFromExtension($extension->getAlias(), $config); + + $container->compile(); + + return $container; + } + + /** + * @return SimpleClientContainerExtension + */ + private function buildContainerExtension() + { + $map = [ + 'default' => DefaultTransportFactory::class, + 'amqp' => AmqpTransportFactory::class, + 'rabbitmq_amqp' => RabbitMqAmqpTransportFactory::class, + 'dbal' => DbalTransportFactory::class, + 'fs' => FsTransportFactory::class, + 'redis' => RedisTransportFactory::class, + 'stomp' => StompTransportFactory::class, + 'rabbitmq_stomp' => RabbitMqStompTransportFactory::class, + 'sqs' => SqsTransportFactory::class, + ]; + + $extension = new SimpleClientContainerExtension(); + + foreach ($map as $name => $factoryClass) { + if (class_exists($factoryClass)) { + $extension->addTransportFactory(new $factoryClass($name)); + } + } + + return $extension; + } + + /** + * @param array|string $config + * + * @return array + */ + private function buildConfig($config) + { + if (is_string($config) && false !== strpos($config, '://')) { + $extConfig = [ + 'client' => [], + 'transport' => [ + 'default' => $config, + ], + ]; + } elseif (is_string($config)) { + $extConfig = [ + 'client' => [], + 'transport' => [ + 'default' => $config, + $config => [], + ], + ]; + } elseif (is_array($config)) { + $extConfig = array_merge_recursive([ + 'client' => [], + 'transport' => [], + ], $config); + } else { + throw new \LogicException('Expects config is string or array'); + } + + if (empty($extConfig['transport']['default'])) { + $defaultTransport = null; + foreach ($extConfig['transport'] as $transport => $config) { + if ('default' === $transport) { + continue; + } + + $defaultTransport = $transport; + break; + } + + if (false == $defaultTransport) { + throw new \LogicException('There is no transport configured'); + } + + $extConfig['transport']['default'] = $defaultTransport; + } + + return $extConfig; + } } diff --git a/pkg/simple-client/SimpleClientContainerExtension.php b/pkg/simple-client/SimpleClientContainerExtension.php index 0784cfb17..00cab237b 100644 --- a/pkg/simple-client/SimpleClientContainerExtension.php +++ b/pkg/simple-client/SimpleClientContainerExtension.php @@ -1,4 +1,5 @@ root('enqueue'); - - $transportChildren = $rootNode->children() - ->arrayNode('transport')->isRequired()->children(); - - foreach ($this->factories as $factory) { - $factory->addConfiguration( - $transportChildren->arrayNode($factory->getName()) - ); - } - - $rootNode->children() - ->arrayNode('client')->children() - ->scalarNode('prefix')->defaultValue('enqueue')->end() - ->scalarNode('app_name')->defaultValue('app')->end() - ->scalarNode('router_topic')->defaultValue('router')->cannotBeEmpty()->end() - ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() - ->end()->end() - ->arrayNode('extensions')->addDefaultsIfNotSet()->children() - ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() - ->end()->end() - ; - - return $tb->buildTree(); - } - /** * @param TransportFactoryInterface $transportFactory */ @@ -105,6 +72,11 @@ public function load(array $configs, ContainerBuilder $container) $this->factories[$name]->createDriver($container, $transportConfig); } + $transportConfig = isset($config['transport']['default']['alias']) ? + $config['transport'][$config['transport']['default']['alias']] : + [] + ; + $container->register('enqueue.client.config', Config::class) ->setArguments([ $config['client']['prefix'], @@ -113,12 +85,12 @@ public function load(array $configs, ContainerBuilder $container) $config['client']['router_queue'], $config['client']['default_processor_queue'], 'enqueue.client.router_processor', - $config['transport'][$config['transport']['default']['alias']], + $transportConfig, ]); $container->register('enqueue.client.producer', Producer::class) ->setArguments([ - new Reference('enqueue.client.driver') + new Reference('enqueue.client.driver'), ]); $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class) @@ -138,7 +110,7 @@ public function load(array $configs, ContainerBuilder $container) $container->register('enqueue.client.queue_consumer', QueueConsumer::class) ->setArguments([ new Reference('enqueue.transport.context'), - new Reference('enqueue.consumption.extensions') + new Reference('enqueue.consumption.extensions'), ]); // router @@ -155,7 +127,7 @@ public function load(array $configs, ContainerBuilder $container) $container->register('enqueue.client.delay_redelivered_message_extension', DelayRedeliveredMessageExtension::class) ->setArguments([ new Reference('enqueue.client.driver'), - $config['client']['redelivered_delay_time'] + $config['client']['redelivered_delay_time'], ]); $extensions[] = new Reference('enqueue.client.delay_redelivered_message_extension'); @@ -169,4 +141,38 @@ public function load(array $configs, ContainerBuilder $container) $container->register('enqueue.consumption.extensions', ConsumptionChainExtension::class) ->setArguments([$extensions]); } + + /** + * @return NodeInterface + */ + private function createConfiguration() + { + $tb = new TreeBuilder(); + $rootNode = $tb->root('enqueue'); + + $transportChildren = $rootNode->children() + ->arrayNode('transport')->isRequired()->children(); + + foreach ($this->factories as $factory) { + $factory->addConfiguration( + $transportChildren->arrayNode($factory->getName()) + ); + } + + $rootNode->children() + ->arrayNode('client')->children() + ->scalarNode('prefix')->defaultValue('enqueue')->end() + ->scalarNode('app_name')->defaultValue('app')->end() + ->scalarNode('router_topic')->defaultValue('router')->cannotBeEmpty()->end() + ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() + ->end()->end() + ->arrayNode('extensions')->addDefaultsIfNotSet()->children() + ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() + ->end()->end() + ; + + return $tb->buildTree(); + } } diff --git a/pkg/simple-client/Tests/Functional/SimpleClientTest.php b/pkg/simple-client/Tests/Functional/SimpleClientTest.php index 9bd572478..06798ca2b 100644 --- a/pkg/simple-client/Tests/Functional/SimpleClientTest.php +++ b/pkg/simple-client/Tests/Functional/SimpleClientTest.php @@ -1,12 +1,13 @@ [[ 'transport' => [ + 'default' => 'amqp', 'amqp' => [ 'host' => getenv('SYMFONY__RABBITMQ__HOST'), 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), @@ -40,10 +42,26 @@ public function transportConfigDataProvider() 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), ], ], - ]; + ]]; - $rabbitmqAmqp = [ + yield 'config_as_dsn_string' => [getenv('AMQP_DSN')]; + + yield 'amqp_dsn' => [[ 'transport' => [ + 'default' => 'amqp', + 'amqp' => getenv('AMQP_DSN'), + ], + ]]; + + yield 'default_amqp_as_dsn' => [[ + 'transport' => [ + 'default' => getenv('AMQP_DSN'), + ], + ]]; + + yield [[ + 'transport' => [ + 'default' => 'rabbitmq_amqp', 'rabbitmq_amqp' => [ 'host' => getenv('SYMFONY__RABBITMQ__HOST'), 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), @@ -52,13 +70,13 @@ public function transportConfigDataProvider() 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), ], ], - ]; - - return [[$amqp, $rabbitmqAmqp]]; + ]]; } /** * @dataProvider transportConfigDataProvider + * + * @param mixed $config */ public function testProduceAndConsumeOneMessage($config) { @@ -84,6 +102,8 @@ public function testProduceAndConsumeOneMessage($config) /** * @dataProvider transportConfigDataProvider + * + * @param mixed $config */ public function testProduceAndRouteToTwoConsumes($config) { diff --git a/pkg/simple-client/composer.json b/pkg/simple-client/composer.json index d41d45568..58523d95c 100644 --- a/pkg/simple-client/composer.json +++ b/pkg/simple-client/composer.json @@ -19,7 +19,10 @@ }, "require-dev": { "phpunit/phpunit": "~5.5", - "enqueue/test": "^0.4" + "enqueue/test": "^0.4", + "enqueue/amqp-ext": "^0.4", + "enqueue/fs": "^0.4", + "enqueue/null": "^0.4" }, "autoload": { "psr-4": { "Enqueue\\SimpleClient\\": "" },