diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md index 3164e59bc..cac0ccd6b 100644 --- a/docs/bundle/config_reference.md +++ b/docs/bundle/config_reference.md @@ -38,7 +38,8 @@ enqueue: # The option tells whether RabbitMQ broker has delay plugin installed or not delay_plugin_installed: false - amqp_ext: + amqp: + driver: ~ # One of "ext"; "lib"; "bunny" # The connection to AMQP broker set as a string. Other parameters could be used as defaults dsn: ~ @@ -86,106 +87,8 @@ enqueue: # The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option. driver_options: ~ - rabbitmq_amqp_ext: - - # The connection to AMQP broker set as a string. Other parameters could be used as defaults - dsn: ~ - - # The host to connect too. Note: Max 1024 characters - host: ~ - - # Port on the host. - port: ~ - - # The user name to use. Note: Max 128 characters. - user: ~ - - # Password. Note: Max 128 characters. - pass: ~ - - # The virtual host on the host. Note: Max 128 characters. - vhost: ~ - - # Connection timeout. Note: 0 or greater seconds. May be fractional. - connection_timeout: ~ - - # Timeout in for income activity. Note: 0 or greater seconds. May be fractional. - read_timeout: ~ - - # Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. - write_timeout: ~ - - # How often to send heartbeat. 0 means off. - heartbeat: ~ - persisted: ~ - lazy: ~ - - # The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher - receive_method: ~ # One of "basic_get"; "basic_consume" - - # The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit" - qos_prefetch_size: ~ - - # Specifies a prefetch window in terms of whole messages - qos_prefetch_count: ~ - - # If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection. - qos_global: ~ - - # The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option. - driver_options: ~ - - # The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id - delay_strategy: dlx - amqp_lib: - - # The connection to AMQP broker set as a string. Other parameters could be used as defaults - dsn: ~ - - # The host to connect too. Note: Max 1024 characters - host: ~ - - # Port on the host. - port: ~ - - # The user name to use. Note: Max 128 characters. - user: ~ - - # Password. Note: Max 128 characters. - pass: ~ - - # The virtual host on the host. Note: Max 128 characters. - vhost: ~ - - # Connection timeout. Note: 0 or greater seconds. May be fractional. - connection_timeout: ~ - - # Timeout in for income activity. Note: 0 or greater seconds. May be fractional. - read_timeout: ~ - - # Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. - write_timeout: ~ - - # How often to send heartbeat. 0 means off. - heartbeat: ~ - persisted: ~ - lazy: ~ - - # The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher - receive_method: ~ # One of "basic_get"; "basic_consume" - - # The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit" - qos_prefetch_size: ~ - - # Specifies a prefetch window in terms of whole messages - qos_prefetch_count: ~ - - # If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection. - qos_global: ~ - - # The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option. - driver_options: ~ - rabbitmq_amqp_lib: + rabbitmq_amqp: + driver: ~ # One of "ext"; "lib"; "bunny" # The connection to AMQP broker set as a string. Other parameters could be used as defaults dsn: ~ @@ -293,105 +196,6 @@ enqueue: # the connection will be performed as later as possible, if the option set to true lazy: true - amqp_bunny: - - # The connection to AMQP broker set as a string. Other parameters could be used as defaults - dsn: ~ - - # The host to connect too. Note: Max 1024 characters - host: ~ - - # Port on the host. - port: ~ - - # The user name to use. Note: Max 128 characters. - user: ~ - - # Password. Note: Max 128 characters. - pass: ~ - - # The virtual host on the host. Note: Max 128 characters. - vhost: ~ - - # Connection timeout. Note: 0 or greater seconds. May be fractional. - connection_timeout: ~ - - # Timeout in for income activity. Note: 0 or greater seconds. May be fractional. - read_timeout: ~ - - # Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. - write_timeout: ~ - - # How often to send heartbeat. 0 means off. - heartbeat: ~ - persisted: ~ - lazy: ~ - - # The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher - receive_method: ~ # One of "basic_get"; "basic_consume" - - # The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit" - qos_prefetch_size: ~ - - # Specifies a prefetch window in terms of whole messages - qos_prefetch_count: ~ - - # If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection. - qos_global: ~ - - # The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option. - driver_options: ~ - rabbitmq_amqp_bunny: - - # The connection to AMQP broker set as a string. Other parameters could be used as defaults - dsn: ~ - - # The host to connect too. Note: Max 1024 characters - host: ~ - - # Port on the host. - port: ~ - - # The user name to use. Note: Max 128 characters. - user: ~ - - # Password. Note: Max 128 characters. - pass: ~ - - # The virtual host on the host. Note: Max 128 characters. - vhost: ~ - - # Connection timeout. Note: 0 or greater seconds. May be fractional. - connection_timeout: ~ - - # Timeout in for income activity. Note: 0 or greater seconds. May be fractional. - read_timeout: ~ - - # Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. - write_timeout: ~ - - # How often to send heartbeat. 0 means off. - heartbeat: ~ - persisted: ~ - lazy: ~ - - # The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher - receive_method: ~ # One of "basic_get"; "basic_consume" - - # The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit" - qos_prefetch_size: ~ - - # Specifies a prefetch window in terms of whole messages - qos_prefetch_count: ~ - - # If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection. - qos_global: ~ - - # The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option. - driver_options: ~ - - # The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id - delay_strategy: dlx client: traceable_producer: false prefix: enqueue diff --git a/pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php b/pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php index dbcb71f5a..039d3db3b 100644 --- a/pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php +++ b/pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php @@ -15,16 +15,16 @@ public function registerDelayStrategy(ContainerBuilder $container, array $config if ($config['delay_strategy']) { $factory = $container->getDefinition($factoryId); - if (false == is_a($factory->getClass(), DelayStrategyAware::class, true)) { + if (false == (is_a($factory->getClass(), DelayStrategyAware::class, true) || $factory->getFactory())) { throw new \LogicException('Connection factory does not support delays'); } - if (strtolower($config['delay_strategy']) === 'dlx') { + if ('dlx' === strtolower($config['delay_strategy'])) { $delayId = sprintf('enqueue.client.%s.delay_strategy', $factoryName); $container->register($delayId, RabbitMqDlxDelayStrategy::class); $factory->addMethodCall('setDelayStrategy', [new Reference($delayId)]); - } elseif (strtolower($config['delay_strategy']) === 'delayed_message_plugin') { + } elseif ('delayed_message_plugin' === strtolower($config['delay_strategy'])) { $delayId = sprintf('enqueue.client.%s.delay_strategy', $factoryName); $container->register($delayId, RabbitMqDelayPluginDelayStrategy::class); diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index 75339c2f2..c221484e0 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -2,9 +2,6 @@ namespace Enqueue\Bundle; -use Enqueue\AmqpBunny\AmqpConnectionFactory as AmqpBunnyConnectionFactory; -use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnectionFactory; -use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnectionFactory; use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventsPass; use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncTransformersPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass; @@ -55,15 +52,8 @@ public function build(ContainerBuilder $container) $extension->addTransportFactory(new RabbitMqStompTransportFactory()); } - if (class_exists(AmqpExtConnectionFactory::class)) { - $extension->addTransportFactory(new AmqpTransportFactory(AmqpExtConnectionFactory::class, 'amqp_ext')); - $extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpExtConnectionFactory::class, 'rabbitmq_amqp_ext')); - } - - if (class_exists(AmqpLibConnectionFactory::class)) { - $extension->addTransportFactory(new AmqpTransportFactory(AmqpLibConnectionFactory::class, 'amqp_lib')); - $extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpLibConnectionFactory::class, 'rabbitmq_amqp_lib')); - } + $extension->addTransportFactory(new AmqpTransportFactory('amqp')); + $extension->addTransportFactory(new RabbitMqAmqpTransportFactory('rabbitmq_amqp')); if (class_exists(FsConnectionFactory::class)) { $extension->addTransportFactory(new FsTransportFactory()); @@ -81,11 +71,6 @@ public function build(ContainerBuilder $container) $extension->addTransportFactory(new SqsTransportFactory()); } - if (class_exists(AmqpBunnyConnectionFactory::class)) { - $extension->addTransportFactory(new AmqpTransportFactory(AmqpBunnyConnectionFactory::class, 'amqp_bunny')); - $extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpBunnyConnectionFactory::class, 'rabbitmq_amqp_bunny')); - } - $container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); $container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); } diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php index 3011bf246..1992dd8d5 100644 --- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php @@ -23,10 +23,11 @@ public function setUp() public function provideEnqueueConfigs() { - yield 'amqp_ext' => [[ + yield 'amqp' => [[ 'transport' => [ - 'default' => 'amqp_ext', - 'amqp_ext' => [ + 'default' => 'amqp', + 'amqp' => [ + 'driver' => 'ext', 'host' => getenv('SYMFONY__RABBITMQ__HOST'), 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), 'user' => getenv('SYMFONY__RABBITMQ__USER'), @@ -39,8 +40,8 @@ public function provideEnqueueConfigs() yield 'amqp_dsn' => [[ 'transport' => [ - 'default' => 'amqp_ext', - 'amqp_ext' => getenv('AMQP_DSN'), + 'default' => 'amqp', + 'amqp' => getenv('AMQP_DSN'), ], ]]; diff --git a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php index 111155db8..638f6f4c2 100644 --- a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php @@ -2,9 +2,6 @@ namespace Enqueue\Bundle\Tests\Unit; -use Enqueue\AmqpBunny\AmqpConnectionFactory as AmqpBunnyConnectionFactory; -use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnectionFactory; -use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnectionFactory; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; @@ -113,7 +110,7 @@ public function testShouldRegisterStompAndRabbitMqStompTransportFactories() $bundle->build($container); } - public function testShouldRegisterAmqpExtAndRabbitMqAmqpExtTransportFactories() + public function testShouldRegisterAmqpAndRabbitMqAmqpTransportFactories() { $extensionMock = $this->createEnqueueExtensionMock(); @@ -125,43 +122,15 @@ public function testShouldRegisterAmqpExtAndRabbitMqAmqpExtTransportFactories() ->method('addTransportFactory') ->with($this->isInstanceOf(AmqpTransportFactory::class)) ->willReturnCallback(function (AmqpTransportFactory $factory) { - $this->assertSame(AmqpExtConnectionFactory::class, $factory->getAmqpConnectionFactoryClass()); + $this->assertSame('amqp', $factory->getName()); }) ; $extensionMock ->expects($this->at(3)) ->method('addTransportFactory') ->with($this->isInstanceOf(RabbitMqAmqpTransportFactory::class)) - ->willReturnCallback(function (AmqpTransportFactory $factory) { - $this->assertSame(AmqpExtConnectionFactory::class, $factory->getAmqpConnectionFactoryClass()); - }) - ; - - $bundle = new EnqueueBundle(); - $bundle->build($container); - } - - public function testShouldRegisterAmqpLibAndRabbitMqAmqpLibTransportFactories() - { - $extensionMock = $this->createEnqueueExtensionMock(); - - $container = new ContainerBuilder(); - $container->registerExtension($extensionMock); - - $extensionMock - ->expects($this->at(4)) - ->method('addTransportFactory') - ->with($this->isInstanceOf(AmqpTransportFactory::class)) - ->willReturnCallback(function (AmqpTransportFactory $factory) { - $this->assertSame(AmqpLibConnectionFactory::class, $factory->getAmqpConnectionFactoryClass()); - }) - ; - $extensionMock - ->expects($this->at(5)) - ->method('addTransportFactory') - ->with($this->isInstanceOf(RabbitMqAmqpTransportFactory::class)) - ->willReturnCallback(function (AmqpTransportFactory $factory) { - $this->assertSame(AmqpLibConnectionFactory::class, $factory->getAmqpConnectionFactoryClass()); + ->willReturnCallback(function (RabbitMqAmqpTransportFactory $factory) { + $this->assertSame('rabbitmq_amqp', $factory->getName()); }) ; @@ -177,7 +146,7 @@ public function testShouldRegisterFSTransportFactory() $container->registerExtension($extensionMock); $extensionMock - ->expects($this->at(6)) + ->expects($this->at(4)) ->method('addTransportFactory') ->with($this->isInstanceOf(FsTransportFactory::class)) ; @@ -194,7 +163,7 @@ public function testShouldRegisterRedisTransportFactory() $container->registerExtension($extensionMock); $extensionMock - ->expects($this->at(7)) + ->expects($this->at(5)) ->method('addTransportFactory') ->with($this->isInstanceOf(RedisTransportFactory::class)) ; @@ -211,7 +180,7 @@ public function testShouldRegisterDbalTransportFactory() $container->registerExtension($extensionMock); $extensionMock - ->expects($this->at(8)) + ->expects($this->at(6)) ->method('addTransportFactory') ->with($this->isInstanceOf(DbalTransportFactory::class)) ; @@ -228,7 +197,7 @@ public function testShouldRegisterSqsTransportFactory() $container->registerExtension($extensionMock); $extensionMock - ->expects($this->at(9)) + ->expects($this->at(7)) ->method('addTransportFactory') ->with($this->isInstanceOf(SqsTransportFactory::class)) ; @@ -237,34 +206,6 @@ public function testShouldRegisterSqsTransportFactory() $bundle->build($container); } - public function testShouldRegisterAmqpBunnyTransportFactory() - { - $extensionMock = $this->createEnqueueExtensionMock(); - - $container = new ContainerBuilder(); - $container->registerExtension($extensionMock); - - $extensionMock - ->expects($this->at(10)) - ->method('addTransportFactory') - ->with($this->isInstanceOf(AmqpTransportFactory::class)) - ->willReturnCallback(function (AmqpTransportFactory $factory) { - $this->assertSame(AmqpBunnyConnectionFactory::class, $factory->getAmqpConnectionFactoryClass()); - }) - ; - $extensionMock - ->expects($this->at(11)) - ->method('addTransportFactory') - ->with($this->isInstanceOf(RabbitMqAmqpTransportFactory::class)) - ->willReturnCallback(function (AmqpTransportFactory $factory) { - $this->assertSame(AmqpBunnyConnectionFactory::class, $factory->getAmqpConnectionFactoryClass()); - }) - ; - - $bundle = new EnqueueBundle(); - $bundle->build($container); - } - /** * @return \PHPUnit_Framework_MockObject_MockObject|EnqueueExtension */ diff --git a/pkg/enqueue/Symfony/AmqpTransportFactory.php b/pkg/enqueue/Symfony/AmqpTransportFactory.php index bef8dea37..a3351e9eb 100644 --- a/pkg/enqueue/Symfony/AmqpTransportFactory.php +++ b/pkg/enqueue/Symfony/AmqpTransportFactory.php @@ -2,32 +2,30 @@ namespace Enqueue\Symfony; +use Enqueue\AmqpBunny\AmqpConnectionFactory as AmqpBunnyConnectionFactory; +use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnectionFactory; +use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnectionFactory; use Enqueue\Client\Amqp\AmqpDriver; +use Interop\Amqp\AmqpConnectionFactory; use Interop\Amqp\AmqpContext; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Definition; use Symfony\Component\DependencyInjection\Reference; +use function Enqueue\dsn_to_connection_factory; class AmqpTransportFactory implements TransportFactoryInterface, DriverFactoryInterface { - /** - * @var string - */ - private $amqpConnectionFactoryClass; - /** * @var string */ private $name; /** - * @param string $amqpConnectionFactoryClass * @param string $name */ - public function __construct($amqpConnectionFactoryClass, $name = 'amqp') + public function __construct($name = 'amqp') { - $this->amqpConnectionFactoryClass = $amqpConnectionFactoryClass; $this->name = $name; } @@ -36,14 +34,32 @@ public function __construct($amqpConnectionFactoryClass, $name = 'amqp') */ public function addConfiguration(ArrayNodeDefinition $builder) { + $drivers = []; + if (class_exists(AmqpExtConnectionFactory::class)) { + $drivers[] = 'ext'; + } + if (class_exists(AmqpLibConnectionFactory::class)) { + $drivers[] = 'lib'; + } + if (class_exists(AmqpBunnyConnectionFactory::class)) { + $drivers[] = 'bunny'; + } + $builder ->beforeNormalization() - ->ifString() + ->ifEmpty() + ->then(function ($v) { + return ['dsn' => 'amqp:']; + }) + ->ifString() ->then(function ($v) { return ['dsn' => $v]; }) ->end() ->children() + ->enumNode('driver') + ->values($drivers) + ->end() ->scalarNode('dsn') ->info('The connection to AMQP broker set as a string. Other parameters could be used as defaults') ->end() @@ -113,7 +129,8 @@ public function createConnectionFactory(ContainerBuilder $container, array $conf $config = array_replace($driverOptions, $config); } - $factory = new Definition($this->amqpConnectionFactoryClass); + $factory = new Definition(AmqpConnectionFactory::class); + $factory->setFactory([self::class, 'createConnectionFactoryFactory']); $factory->setArguments([$config]); $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); @@ -164,11 +181,31 @@ public function getName() return $this->name; } - /** - * @return string - */ - public function getAmqpConnectionFactoryClass() + public static function createConnectionFactoryFactory(array $config) { - return $this->amqpConnectionFactoryClass; + if (array_key_exists('driver', $config)) { + if ('ext' == $config['driver']) { + return new AmqpExtConnectionFactory($config); + } + if ('lib' == $config['driver']) { + return new AmqpLibConnectionFactory($config); + } + if ('bunny' == $config['driver']) { + return new AmqpBunnyConnectionFactory($config); + } + + throw new \LogicException(sprintf('Unexpected driver given "%s"', $config['driver'])); + } + + $dsn = array_key_exists('dsn', $config) ? $config['dsn'] : 'amqp:'; + $factory = dsn_to_connection_factory($dsn); + + if (false == $factory instanceof AmqpConnectionFactory) { + throw new \LogicException(sprintf('Factory must be instance of "%s" but got "%s"', AmqpConnectionFactory::class, get_class($factory))); + } + + $factoryClass = get_class($factory); + + return new $factoryClass($config); } } diff --git a/pkg/enqueue/Symfony/DefaultTransportFactory.php b/pkg/enqueue/Symfony/DefaultTransportFactory.php index 5911dc43d..24972986c 100644 --- a/pkg/enqueue/Symfony/DefaultTransportFactory.php +++ b/pkg/enqueue/Symfony/DefaultTransportFactory.php @@ -177,7 +177,7 @@ private function findFactory($dsn) $factory = dsn_to_connection_factory($dsn); if ($factory instanceof AmqpConnectionFactory) { - return new AmqpTransportFactory(get_class($factory), 'default_amqp'); + return new AmqpTransportFactory('default_amqp'); } if ($factory instanceof FsConnectionFactory) { diff --git a/pkg/enqueue/Symfony/RabbitMqAmqpTransportFactory.php b/pkg/enqueue/Symfony/RabbitMqAmqpTransportFactory.php index cfc4d2413..2bd98e584 100644 --- a/pkg/enqueue/Symfony/RabbitMqAmqpTransportFactory.php +++ b/pkg/enqueue/Symfony/RabbitMqAmqpTransportFactory.php @@ -14,12 +14,11 @@ class RabbitMqAmqpTransportFactory extends AmqpTransportFactory use DelayStrategyTransportFactoryTrait; /** - * @param string $amqpConnectionFactoryClass * @param string $name */ - public function __construct($amqpConnectionFactoryClass, $name = 'rabbitmq_amqp') + public function __construct($name = 'rabbitmq_amqp') { - parent::__construct($amqpConnectionFactoryClass, $name); + parent::__construct($name); } /** diff --git a/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php index 5c93bce2e..d468b33c2 100644 --- a/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php @@ -25,21 +25,28 @@ public function testShouldImplementTransportFactoryInterface() public function testCouldBeConstructedWithDefaultName() { - $transport = new AmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new AmqpTransportFactory(); $this->assertEquals('amqp', $transport->getName()); } public function testCouldBeConstructedWithCustomName() { - $transport = new AmqpTransportFactory($this->createAmqpConnectionFactoryClass(), 'theCustomName'); + $transport = new AmqpTransportFactory('theCustomName'); + + $this->assertEquals('theCustomName', $transport->getName()); + } + + public function testThrowIfCouldBeConstructedWithCustomName() + { + $transport = new AmqpTransportFactory('theCustomName'); $this->assertEquals('theCustomName', $transport->getName()); } public function testShouldAllowAddConfiguration() { - $transport = new AmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new AmqpTransportFactory(); $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); @@ -84,7 +91,7 @@ public function testShouldAllowAddConfiguration() public function testShouldAllowAddConfigurationWithDriverOptions() { - $transport = new AmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new AmqpTransportFactory(); $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); @@ -107,7 +114,7 @@ public function testShouldAllowAddConfigurationWithDriverOptions() public function testShouldAllowAddConfigurationAsString() { - $transport = new AmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new AmqpTransportFactory(); $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); @@ -122,7 +129,7 @@ public function testShouldAllowAddConfigurationAsString() public function testThrowIfInvalidReceiveMethodIsSet() { - $transport = new AmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new AmqpTransportFactory(); $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); @@ -138,7 +145,7 @@ public function testThrowIfInvalidReceiveMethodIsSet() public function testShouldAllowChangeReceiveMethod() { - $transport = new AmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new AmqpTransportFactory(); $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); @@ -157,15 +164,14 @@ public function testShouldCreateConnectionFactoryForEmptyConfig() { $container = new ContainerBuilder(); - $expectedClass = $this->createAmqpConnectionFactoryClass(); - - $transport = new AmqpTransportFactory($expectedClass); + $transport = new AmqpTransportFactory(); $serviceId = $transport->createConnectionFactory($container, []); $this->assertTrue($container->hasDefinition($serviceId)); $factory = $container->getDefinition($serviceId); - $this->assertEquals($expectedClass, $factory->getClass()); + $this->assertEquals(AmqpConnectionFactory::class, $factory->getClass()); + $this->assertSame([[]], $factory->getArguments()); } @@ -173,27 +179,23 @@ public function testShouldCreateConnectionFactoryFromDsnString() { $container = new ContainerBuilder(); - $expectedClass = $this->createAmqpConnectionFactoryClass(); - - $transport = new AmqpTransportFactory($expectedClass); + $transport = new AmqpTransportFactory(); $serviceId = $transport->createConnectionFactory($container, [ - 'dsn' => 'theConnectionDSN', + 'dsn' => 'theConnectionDSN:', ]); $this->assertTrue($container->hasDefinition($serviceId)); $factory = $container->getDefinition($serviceId); - $this->assertEquals($expectedClass, $factory->getClass()); - $this->assertSame([['dsn' => 'theConnectionDSN']], $factory->getArguments()); + $this->assertEquals(AmqpConnectionFactory::class, $factory->getClass()); + $this->assertSame([['dsn' => 'theConnectionDSN:']], $factory->getArguments()); } public function testShouldCreateConnectionFactoryAndMergeDriverOptionsIfSet() { $container = new ContainerBuilder(); - $expectedClass = $this->createAmqpConnectionFactoryClass(); - - $transport = new AmqpTransportFactory($expectedClass); + $transport = new AmqpTransportFactory(); $serviceId = $transport->createConnectionFactory($container, [ 'host' => 'aHost', @@ -204,7 +206,7 @@ public function testShouldCreateConnectionFactoryAndMergeDriverOptionsIfSet() $this->assertTrue($container->hasDefinition($serviceId)); $factory = $container->getDefinition($serviceId); - $this->assertEquals($expectedClass, $factory->getClass()); + $this->assertEquals(AmqpConnectionFactory::class, $factory->getClass()); $this->assertSame([['foo' => 'fooVal', 'host' => 'aHost']], $factory->getArguments()); } @@ -212,12 +214,9 @@ public function testShouldCreateConnectionFactoryFromDsnStringPlushArrayOptions( { $container = new ContainerBuilder(); - $expectedClass = $this->createAmqpConnectionFactoryClass(); - - $transport = new AmqpTransportFactory($expectedClass); + $transport = new AmqpTransportFactory(); $serviceId = $transport->createConnectionFactory($container, [ - 'dsn' => 'theConnectionDSN', 'host' => 'localhost', 'port' => 5672, 'user' => 'guest', @@ -228,9 +227,8 @@ public function testShouldCreateConnectionFactoryFromDsnStringPlushArrayOptions( $this->assertTrue($container->hasDefinition($serviceId)); $factory = $container->getDefinition($serviceId); - $this->assertEquals($expectedClass, $factory->getClass()); + $this->assertEquals(AmqpConnectionFactory::class, $factory->getClass()); $this->assertSame([[ - 'dsn' => 'theConnectionDSN', 'host' => 'localhost', 'port' => 5672, 'user' => 'guest', @@ -244,7 +242,7 @@ public function testShouldCreateContext() { $container = new ContainerBuilder(); - $transport = new AmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new AmqpTransportFactory(); $serviceId = $transport->createContext($container, [ 'host' => 'localhost', @@ -268,7 +266,7 @@ public function testShouldCreateDriver() { $container = new ContainerBuilder(); - $transport = new AmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new AmqpTransportFactory(); $serviceId = $transport->createDriver($container, []); @@ -288,11 +286,61 @@ public function testShouldCreateDriver() $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2)); } - /** - * @return string - */ - private function createAmqpConnectionFactoryClass() + public function testShouldCreateAmqpExtConnectionFactoryBySetDriver() { - return $this->getMockClass(AmqpConnectionFactory::class); + $factory = AmqpTransportFactory::createConnectionFactoryFactory(['driver' => 'ext']); + + $this->assertInstanceOf(\Enqueue\AmqpExt\AmqpConnectionFactory::class, $factory); + } + + public function testShouldCreateAmqpLibConnectionFactoryBySetDriver() + { + $factory = AmqpTransportFactory::createConnectionFactoryFactory(['driver' => 'lib']); + + $this->assertInstanceOf(\Enqueue\AmqpLib\AmqpConnectionFactory::class, $factory); + } + + public function testShouldCreateAmqpBunnyConnectionFactoryBySetDriver() + { + $factory = AmqpTransportFactory::createConnectionFactoryFactory(['driver' => 'bunny']); + + $this->assertInstanceOf(\Enqueue\AmqpBunny\AmqpConnectionFactory::class, $factory); + } + + public function testShouldCreateAmqpExtFromConfigWithoutDriverAndDsn() + { + $factory = AmqpTransportFactory::createConnectionFactoryFactory(['host' => 'aHost']); + + $this->assertInstanceOf(\Enqueue\AmqpExt\AmqpConnectionFactory::class, $factory); + } + + public function testThrowIfInvalidDriverGiven() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Unexpected driver given "invalidDriver"'); + + AmqpTransportFactory::createConnectionFactoryFactory(['driver' => 'invalidDriver']); + } + + public function testShouldCreateAmqpExtFromDsn() + { + $factory = AmqpTransportFactory::createConnectionFactoryFactory(['dsn' => 'amqp:']); + + $this->assertInstanceOf(\Enqueue\AmqpExt\AmqpConnectionFactory::class, $factory); + } + + public function testShouldCreateAmqpBunnyFromDsnWithDriver() + { + $factory = AmqpTransportFactory::createConnectionFactoryFactory(['dsn' => 'amqp+bunny:']); + + $this->assertInstanceOf(\Enqueue\AmqpBunny\AmqpConnectionFactory::class, $factory); + } + + public function testThrowIfNotAmqpDsnProvided() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Factory must be instance of "Interop\Amqp\AmqpConnectionFactory" but got "Enqueue\Sqs\SqsConnectionFactory"'); + + AmqpTransportFactory::createConnectionFactoryFactory(['dsn' => 'sqs:']); } } diff --git a/pkg/enqueue/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php index 9d16eb540..cfe77e6d4 100644 --- a/pkg/enqueue/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php @@ -30,21 +30,21 @@ public function testShouldExtendAmqpTransportFactoryClass() public function testCouldBeConstructedWithDefaultName() { - $transport = new RabbitMqAmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new RabbitMqAmqpTransportFactory(); $this->assertEquals('rabbitmq_amqp', $transport->getName()); } public function testCouldBeConstructedWithCustomName() { - $transport = new RabbitMqAmqpTransportFactory($this->createAmqpConnectionFactoryClass(), 'theCustomName'); + $transport = new RabbitMqAmqpTransportFactory('theCustomName'); $this->assertEquals('theCustomName', $transport->getName()); } public function testShouldAllowAddConfiguration() { - $transport = new RabbitMqAmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new RabbitMqAmqpTransportFactory(); $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); @@ -61,9 +61,7 @@ public function testShouldCreateConnectionFactory() { $container = new ContainerBuilder(); - $expectedClass = $this->createAmqpConnectionFactoryClass(); - - $transport = new RabbitMqAmqpTransportFactory($expectedClass); + $transport = new RabbitMqAmqpTransportFactory(); $serviceId = $transport->createConnectionFactory($container, [ 'host' => 'localhost', @@ -77,7 +75,7 @@ public function testShouldCreateConnectionFactory() $this->assertTrue($container->hasDefinition($serviceId)); $factory = $container->getDefinition($serviceId); - $this->assertEquals($expectedClass, $factory->getClass()); + $this->assertEquals(AmqpConnectionFactory::class, $factory->getClass()); $this->assertSame([[ 'host' => 'localhost', 'port' => 5672, @@ -93,7 +91,7 @@ public function testShouldCreateContext() { $container = new ContainerBuilder(); - $transport = new RabbitMqAmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new RabbitMqAmqpTransportFactory(); $serviceId = $transport->createContext($container, [ 'host' => 'localhost', @@ -118,7 +116,7 @@ public function testShouldCreateDriver() { $container = new ContainerBuilder(); - $transport = new RabbitMqAmqpTransportFactory($this->createAmqpConnectionFactoryClass()); + $transport = new RabbitMqAmqpTransportFactory(); $serviceId = $transport->createDriver($container, []); @@ -128,12 +126,4 @@ public function testShouldCreateDriver() $driver = $container->getDefinition($serviceId); $this->assertSame(RabbitMqDriver::class, $driver->getClass()); } - - /** - * @return string - */ - private function createAmqpConnectionFactoryClass() - { - return $this->getMockClass(AmqpConnectionFactory::class); - } } diff --git a/pkg/fs/Tests/FsConnectionFactoryTest.php b/pkg/fs/Tests/FsConnectionFactoryTest.php index aa7c511bd..30cfc6f69 100644 --- a/pkg/fs/Tests/FsConnectionFactoryTest.php +++ b/pkg/fs/Tests/FsConnectionFactoryTest.php @@ -19,7 +19,7 @@ public function testShouldImplementConnectionFactoryInterface() public function testShouldCreateContext() { $factory = new FsConnectionFactory([ - 'path' => 'theDir', + 'path' => __DIR__, 'pre_fetch_count' => 123, 'chmod' => 0765, ]); @@ -28,7 +28,7 @@ public function testShouldCreateContext() $this->assertInstanceOf(FsContext::class, $context); - $this->assertAttributeSame('theDir', 'storeDir', $context); + $this->assertAttributeSame(__DIR__, 'storeDir', $context); $this->assertAttributeSame(123, 'preFetchCount', $context); $this->assertAttributeSame(0765, 'chmod', $context); } diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index 0abfc74b9..5bdc4f1c3 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -2,9 +2,6 @@ namespace Enqueue\SimpleClient; -use Enqueue\AmqpBunny\AmqpConnectionFactory as AmqpBunnyConnectionFactory; -use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnectionFactory; -use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnectionFactory; use Enqueue\Client\ArrayProcessorRegistry; use Enqueue\Client\Config; use Enqueue\Client\DelegateProcessor; @@ -49,8 +46,8 @@ final class SimpleClient * *$config = [ * 'transport' => [ - * 'default' => 'amqp_ext', - * 'amqp_ext' => [], // amqp options here + * 'default' => 'amqp', + * 'amqp' => [], // amqp options here * ], * ] * @@ -58,8 +55,8 @@ final class SimpleClient * * $config = [ * 'transport' => [ - * 'default' => 'amqp_ext', - * 'amqp_ext' => [], + * 'default' => 'amqp', + * 'amqp' => [], * .... * ], * 'client' => [ @@ -294,20 +291,8 @@ private function buildContainerExtension() } } - if (class_exists(AmqpExtConnectionFactory::class)) { - $extension->addTransportFactory(new AmqpTransportFactory(AmqpExtConnectionFactory::class, 'amqp_ext')); - $extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpExtConnectionFactory::class, 'rabbitmq_amqp_ext')); - } - - if (class_exists(AmqpLibConnectionFactory::class)) { - $extension->addTransportFactory(new AmqpTransportFactory(AmqpLibConnectionFactory::class, 'amqp_lib')); - $extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpLibConnectionFactory::class, 'rabbitmq_amqp_lib')); - } - - if (class_exists(AmqpBunnyConnectionFactory::class)) { - $extension->addTransportFactory(new AmqpTransportFactory(AmqpBunnyConnectionFactory::class, 'amqp_bunny')); - $extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpBunnyConnectionFactory::class, 'rabbitmq_amqp_bunny')); - } + $extension->addTransportFactory(new AmqpTransportFactory('amqp')); + $extension->addTransportFactory(new RabbitMqAmqpTransportFactory('rabbitmq_amqp')); return $extension; } diff --git a/pkg/simple-client/Tests/Functional/SimpleClientTest.php b/pkg/simple-client/Tests/Functional/SimpleClientTest.php index 503b0b49c..3281aa274 100644 --- a/pkg/simple-client/Tests/Functional/SimpleClientTest.php +++ b/pkg/simple-client/Tests/Functional/SimpleClientTest.php @@ -33,8 +33,9 @@ public function transportConfigDataProvider() { yield 'amqp' => [[ 'transport' => [ - 'default' => 'amqp_ext', - 'amqp_ext' => [ + 'default' => 'amqp', + 'amqp' => [ + 'driver' => 'ext', 'host' => getenv('SYMFONY__RABBITMQ__HOST'), 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), 'user' => getenv('SYMFONY__RABBITMQ__USER'), @@ -48,8 +49,8 @@ public function transportConfigDataProvider() yield 'amqp_dsn' => [[ 'transport' => [ - 'default' => 'amqp_ext', - 'amqp_ext' => getenv('AMQP_DSN'), + 'default' => 'amqp', + 'amqp' => getenv('AMQP_DSN'), ], ]]; @@ -61,8 +62,9 @@ public function transportConfigDataProvider() yield [[ 'transport' => [ - 'default' => 'rabbitmq_amqp_ext', - 'rabbitmq_amqp_ext' => [ + 'default' => 'rabbitmq_amqp', + 'rabbitmq_amqp' => [ + 'driver' => 'ext', 'host' => getenv('SYMFONY__RABBITMQ__HOST'), 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), 'user' => getenv('SYMFONY__RABBITMQ__USER'),