Skip to content

Commit b2287c3

Browse files
committed
Merge branch 'amqp-shared-transport-factory' into 0.8
2 parents 8e22efc + e76d6d3 commit b2287c3

13 files changed

+183
-391
lines changed

docs/bundle/config_reference.md

+4-200
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ enqueue:
3838

3939
# The option tells whether RabbitMQ broker has delay plugin installed or not
4040
delay_plugin_installed: false
41-
amqp_ext:
41+
amqp:
42+
driver: ~ # One of "ext"; "lib"; "bunny"
4243

4344
# The connection to AMQP broker set as a string. Other parameters could be used as defaults
4445
dsn: ~
@@ -86,106 +87,8 @@ enqueue:
8687

8788
# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
8889
driver_options: ~
89-
rabbitmq_amqp_ext:
90-
91-
# The connection to AMQP broker set as a string. Other parameters could be used as defaults
92-
dsn: ~
93-
94-
# The host to connect too. Note: Max 1024 characters
95-
host: ~
96-
97-
# Port on the host.
98-
port: ~
99-
100-
# The user name to use. Note: Max 128 characters.
101-
user: ~
102-
103-
# Password. Note: Max 128 characters.
104-
pass: ~
105-
106-
# The virtual host on the host. Note: Max 128 characters.
107-
vhost: ~
108-
109-
# Connection timeout. Note: 0 or greater seconds. May be fractional.
110-
connection_timeout: ~
111-
112-
# Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
113-
read_timeout: ~
114-
115-
# Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
116-
write_timeout: ~
117-
118-
# How often to send heartbeat. 0 means off.
119-
heartbeat: ~
120-
persisted: ~
121-
lazy: ~
122-
123-
# The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher
124-
receive_method: ~ # One of "basic_get"; "basic_consume"
125-
126-
# The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"
127-
qos_prefetch_size: ~
128-
129-
# Specifies a prefetch window in terms of whole messages
130-
qos_prefetch_count: ~
131-
132-
# If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.
133-
qos_global: ~
134-
135-
# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
136-
driver_options: ~
137-
138-
# The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id
139-
delay_strategy: dlx
140-
amqp_lib:
141-
142-
# The connection to AMQP broker set as a string. Other parameters could be used as defaults
143-
dsn: ~
144-
145-
# The host to connect too. Note: Max 1024 characters
146-
host: ~
147-
148-
# Port on the host.
149-
port: ~
150-
151-
# The user name to use. Note: Max 128 characters.
152-
user: ~
153-
154-
# Password. Note: Max 128 characters.
155-
pass: ~
156-
157-
# The virtual host on the host. Note: Max 128 characters.
158-
vhost: ~
159-
160-
# Connection timeout. Note: 0 or greater seconds. May be fractional.
161-
connection_timeout: ~
162-
163-
# Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
164-
read_timeout: ~
165-
166-
# Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
167-
write_timeout: ~
168-
169-
# How often to send heartbeat. 0 means off.
170-
heartbeat: ~
171-
persisted: ~
172-
lazy: ~
173-
174-
# The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher
175-
receive_method: ~ # One of "basic_get"; "basic_consume"
176-
177-
# The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"
178-
qos_prefetch_size: ~
179-
180-
# Specifies a prefetch window in terms of whole messages
181-
qos_prefetch_count: ~
182-
183-
# If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.
184-
qos_global: ~
185-
186-
# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
187-
driver_options: ~
188-
rabbitmq_amqp_lib:
90+
rabbitmq_amqp:
91+
driver: ~ # One of "ext"; "lib"; "bunny"
18992

19093
# The connection to AMQP broker set as a string. Other parameters could be used as defaults
19194
dsn: ~
@@ -293,105 +196,6 @@ enqueue:
293196

294197
# the connection will be performed as later as possible, if the option set to true
295198
lazy: true
296-
amqp_bunny:
297-
298-
# The connection to AMQP broker set as a string. Other parameters could be used as defaults
299-
dsn: ~
300-
301-
# The host to connect too. Note: Max 1024 characters
302-
host: ~
303-
304-
# Port on the host.
305-
port: ~
306-
307-
# The user name to use. Note: Max 128 characters.
308-
user: ~
309-
310-
# Password. Note: Max 128 characters.
311-
pass: ~
312-
313-
# The virtual host on the host. Note: Max 128 characters.
314-
vhost: ~
315-
316-
# Connection timeout. Note: 0 or greater seconds. May be fractional.
317-
connection_timeout: ~
318-
319-
# Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
320-
read_timeout: ~
321-
322-
# Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
323-
write_timeout: ~
324-
325-
# How often to send heartbeat. 0 means off.
326-
heartbeat: ~
327-
persisted: ~
328-
lazy: ~
329-
330-
# The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher
331-
receive_method: ~ # One of "basic_get"; "basic_consume"
332-
333-
# The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"
334-
qos_prefetch_size: ~
335-
336-
# Specifies a prefetch window in terms of whole messages
337-
qos_prefetch_count: ~
338-
339-
# If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.
340-
qos_global: ~
341-
342-
# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
343-
driver_options: ~
344-
rabbitmq_amqp_bunny:
345-
346-
# The connection to AMQP broker set as a string. Other parameters could be used as defaults
347-
dsn: ~
348-
349-
# The host to connect too. Note: Max 1024 characters
350-
host: ~
351-
352-
# Port on the host.
353-
port: ~
354-
355-
# The user name to use. Note: Max 128 characters.
356-
user: ~
357-
358-
# Password. Note: Max 128 characters.
359-
pass: ~
360-
361-
# The virtual host on the host. Note: Max 128 characters.
362-
vhost: ~
363-
364-
# Connection timeout. Note: 0 or greater seconds. May be fractional.
365-
connection_timeout: ~
366-
367-
# Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
368-
read_timeout: ~
369-
370-
# Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
371-
write_timeout: ~
372-
373-
# How often to send heartbeat. 0 means off.
374-
heartbeat: ~
375-
persisted: ~
376-
lazy: ~
377-
378-
# The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher
379-
receive_method: ~ # One of "basic_get"; "basic_consume"
380-
381-
# The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"
382-
qos_prefetch_size: ~
383-
384-
# Specifies a prefetch window in terms of whole messages
385-
qos_prefetch_count: ~
386-
387-
# If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.
388-
qos_global: ~
389-
390-
# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
391-
driver_options: ~
392-
393-
# The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id
394-
delay_strategy: dlx
395199
client:
396200
traceable_producer: false
397201
prefix: enqueue

pkg/amqp-tools/DelayStrategyTransportFactoryTrait.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@ public function registerDelayStrategy(ContainerBuilder $container, array $config
1515
if ($config['delay_strategy']) {
1616
$factory = $container->getDefinition($factoryId);
1717

18-
if (false == is_a($factory->getClass(), DelayStrategyAware::class, true)) {
18+
if (false == (is_a($factory->getClass(), DelayStrategyAware::class, true) || $factory->getFactory())) {
1919
throw new \LogicException('Connection factory does not support delays');
2020
}
2121

22-
if (strtolower($config['delay_strategy']) === 'dlx') {
22+
if ('dlx' === strtolower($config['delay_strategy'])) {
2323
$delayId = sprintf('enqueue.client.%s.delay_strategy', $factoryName);
2424
$container->register($delayId, RabbitMqDlxDelayStrategy::class);
2525

2626
$factory->addMethodCall('setDelayStrategy', [new Reference($delayId)]);
27-
} elseif (strtolower($config['delay_strategy']) === 'delayed_message_plugin') {
27+
} elseif ('delayed_message_plugin' === strtolower($config['delay_strategy'])) {
2828
$delayId = sprintf('enqueue.client.%s.delay_strategy', $factoryName);
2929
$container->register($delayId, RabbitMqDelayPluginDelayStrategy::class);
3030

pkg/enqueue-bundle/EnqueueBundle.php

+2-17
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22

33
namespace Enqueue\Bundle;
44

5-
use Enqueue\AmqpBunny\AmqpConnectionFactory as AmqpBunnyConnectionFactory;
6-
use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnectionFactory;
7-
use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnectionFactory;
85
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventsPass;
96
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncTransformersPass;
107
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
@@ -55,15 +52,8 @@ public function build(ContainerBuilder $container)
5552
$extension->addTransportFactory(new RabbitMqStompTransportFactory());
5653
}
5754

58-
if (class_exists(AmqpExtConnectionFactory::class)) {
59-
$extension->addTransportFactory(new AmqpTransportFactory(AmqpExtConnectionFactory::class, 'amqp_ext'));
60-
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpExtConnectionFactory::class, 'rabbitmq_amqp_ext'));
61-
}
62-
63-
if (class_exists(AmqpLibConnectionFactory::class)) {
64-
$extension->addTransportFactory(new AmqpTransportFactory(AmqpLibConnectionFactory::class, 'amqp_lib'));
65-
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpLibConnectionFactory::class, 'rabbitmq_amqp_lib'));
66-
}
55+
$extension->addTransportFactory(new AmqpTransportFactory('amqp'));
56+
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory('rabbitmq_amqp'));
6757

6858
if (class_exists(FsConnectionFactory::class)) {
6959
$extension->addTransportFactory(new FsTransportFactory());
@@ -81,11 +71,6 @@ public function build(ContainerBuilder $container)
8171
$extension->addTransportFactory(new SqsTransportFactory());
8272
}
8373

84-
if (class_exists(AmqpBunnyConnectionFactory::class)) {
85-
$extension->addTransportFactory(new AmqpTransportFactory(AmqpBunnyConnectionFactory::class, 'amqp_bunny'));
86-
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory(AmqpBunnyConnectionFactory::class, 'rabbitmq_amqp_bunny'));
87-
}
88-
8974
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
9075
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
9176
}

pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php

+6-5
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ public function setUp()
2323

2424
public function provideEnqueueConfigs()
2525
{
26-
yield 'amqp_ext' => [[
26+
yield 'amqp' => [[
2727
'transport' => [
28-
'default' => 'amqp_ext',
29-
'amqp_ext' => [
28+
'default' => 'amqp',
29+
'amqp' => [
30+
'driver' => 'ext',
3031
'host' => getenv('SYMFONY__RABBITMQ__HOST'),
3132
'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'),
3233
'user' => getenv('SYMFONY__RABBITMQ__USER'),
@@ -39,8 +40,8 @@ public function provideEnqueueConfigs()
3940

4041
yield 'amqp_dsn' => [[
4142
'transport' => [
42-
'default' => 'amqp_ext',
43-
'amqp_ext' => getenv('AMQP_DSN'),
43+
'default' => 'amqp',
44+
'amqp' => getenv('AMQP_DSN'),
4445
],
4546
]];
4647

0 commit comments

Comments
 (0)