Skip to content

[BC break][amqp] Use same qos options across all all AMQP transports #221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/bundle/cli_commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Options:
--setup-broker Creates queues, topics, exchanges, binding etc on broker side.
--idle-timeout=IDLE-TIMEOUT The time in milliseconds queue consumer idle if no message has been received.
--receive-timeout=RECEIVE-TIMEOUT The time in milliseconds queue consumer waits for a message.
--skip[=SKIP] Queues to skip consumption of messages from (multiple values allowed)
-h, --help Display this help message
-q, --quiet Do not output any message
-V, --version Display this application version
Expand Down
24 changes: 22 additions & 2 deletions pkg/amqp-bunny/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,23 @@ public function __construct($config = 'amqp:')
throw new \LogicException('The config must be either an array of options, a DSN string or null');
}

$this->config = array_replace($this->defaultConfig(), $config);
$config = array_replace($this->defaultConfig(), $config);

$config = array_replace($this->defaultConfig(), $config);
if (array_key_exists('qos_global', $config)) {
$config['qos_global'] = (bool) $config['qos_global'];
}
if (array_key_exists('qos_prefetch_count', $config)) {
$config['qos_prefetch_count'] = (int) $config['qos_prefetch_count'];
}
if (array_key_exists('qos_prefetch_size', $config)) {
$config['qos_prefetch_size'] = (int) $config['qos_prefetch_size'];
}
if (array_key_exists('lazy', $config)) {
$config['lazy'] = (bool) $config['lazy'];
}

$this->config = $config;

$supportedMethods = ['basic_get', 'basic_consume'];
if (false == in_array($this->config['receive_method'], $supportedMethods, true)) {
Expand All @@ -77,7 +93,10 @@ public function createContext()
{
if ($this->config['lazy']) {
$context = new AmqpContext(function () {
return $this->establishConnection()->channel();
$channel = $this->establishConnection()->channel();
$channel->qos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count'], $this->config['qos_global']);

return $channel;
}, $this->config);
$context->setDelayStrategy($this->delayStrategy);

Expand All @@ -86,6 +105,7 @@ public function createContext()

$context = new AmqpContext($this->establishConnection()->channel(), $this->config);
$context->setDelayStrategy($this->delayStrategy);
$context->setQos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count'], $this->config['qos_global']);

return $context;
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/amqp-bunny/Tests/Spec/AmqpPreFetchCountTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace Enqueue\AmqpBunny\Tests\Spec;

use Enqueue\AmqpBunny\AmqpConnectionFactory;
use Interop\Queue\Spec\Amqp\PreFetchCountSpec;

/**
* @group functional
*/
class AmqpPreFetchCountTest extends PreFetchCountSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));

return $factory->createContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

namespace Enqueue\AmqpBunny\Tests\Spec;

use Enqueue\AmqpLib\AmqpConnectionFactory;
use Enqueue\AmqpBunny\AmqpConnectionFactory;
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Interop\Amqp\AmqpContext;
use Interop\Queue\PsrContext;
use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec;

Expand All @@ -12,6 +13,11 @@
*/
class AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec
{
public function test()
{
$this->markTestIncomplete();
}

/**
* {@inheritdoc}
*/
Expand All @@ -25,12 +31,15 @@ protected function createContext()

/**
* {@inheritdoc}
*
* @param AmqpContext $context
*/
protected function createQueue(PsrContext $context, $queueName)
{
$queue = parent::createQueue($context, $queueName);

$context->declareQueue($queue);
$context->purgeQueue($queue);

return $queue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Enqueue\AmqpBunny\Tests\Spec;

use Enqueue\AmqpLib\AmqpConnectionFactory;
use Enqueue\AmqpBunny\AmqpConnectionFactory;
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Interop\Queue\PsrContext;
use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace Enqueue\AmqpBunny\Tests\Spec;

use Enqueue\AmqpBunny\AmqpConnectionFactory;
use Interop\Queue\Spec\Amqp\SendAndReceiveTimestampAsIntegerSpec;

/**
* @group functional
*/
class AmqpSendAndReceiveTimestampAsIntengerTest extends SendAndReceiveTimestampAsIntegerSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));

return $factory->createContext();
}
}
2 changes: 1 addition & 1 deletion pkg/amqp-bunny/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"enqueue/test": "^0.8@dev",
"enqueue/enqueue": "^0.8@dev",
"enqueue/null": "^0.8@dev",
"queue-interop/queue-spec": "^0.5.1@dev",
"queue-interop/queue-spec": "^0.5.2@dev",
"symfony/dependency-injection": "^2.8|^3",
"symfony/config": "^2.8|^3"
},
Expand Down
45 changes: 29 additions & 16 deletions pkg/amqp-ext/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrate
* 'connect_timeout' => 'Connection timeout. Note: 0 or greater seconds. May be fractional.',
* 'persisted' => 'bool, Whether it use single persisted connection or open a new one for every context',
* 'lazy' => 'the connection will be performed as later as possible, if the option set to true',
* 'pre_fetch_count' => 'Controls how many messages could be prefetched',
* 'pre_fetch_size' => 'Controls how many messages could be prefetched',
* 'qos_prefetch_size' => 'The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"',
* 'qos_prefetch_count' => 'Specifies a prefetch window in terms of whole messages.',
* 'qos_global' => 'If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.',
* 'receive_method' => 'Could be either basic_get or basic_consume',
* ]
*
Expand Down Expand Up @@ -72,7 +73,7 @@ public function __construct($config = 'amqp:')
}

if ('basic_consume' == $this->config['receive_method']) {
if (false == (version_compare(phpversion('amqp'), '1.9.1', '>=') || phpversion('amqp') == '1.9.1-dev')) {
if (false == (version_compare(phpversion('amqp'), '1.9.1', '>=') || '1.9.1-dev' == phpversion('amqp'))) {
// @see https://github.com/php-enqueue/enqueue-dev/issues/110 and https://github.com/pdezwart/php-amqp/issues/281
throw new \LogicException('The "basic_consume" method does not work on amqp extension prior 1.9.1 version.');
}
Expand All @@ -88,7 +89,10 @@ public function createContext()
{
if ($this->config['lazy']) {
$context = new AmqpContext(function () {
return $this->createExtContext($this->establishConnection());
$extContext = $this->createExtContext($this->establishConnection());
$extContext->qos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count']);

return $extContext;
}, $this->config['receive_method']);
$context->setDelayStrategy($this->delayStrategy);

Expand All @@ -97,6 +101,7 @@ public function createContext()

$context = new AmqpContext($this->createExtContext($this->establishConnection()), $this->config['receive_method']);
$context->setDelayStrategy($this->delayStrategy);
$context->setQos($this->config['qos_prefetch_size'], $this->config['qos_prefetch_count'], $this->config['qos_global']);

return $context;
}
Expand All @@ -108,16 +113,7 @@ public function createContext()
*/
private function createExtContext(\AMQPConnection $extConnection)
{
$channel = new \AMQPChannel($extConnection);
if (false == empty($this->config['pre_fetch_count'])) {
$channel->setPrefetchCount((int) $this->config['pre_fetch_count']);
}

if (false == empty($this->config['pre_fetch_size'])) {
$channel->setPrefetchSize((int) $this->config['pre_fetch_size']);
}

return $channel;
return new \AMQPChannel($extConnection);
}

/**
Expand Down Expand Up @@ -183,6 +179,22 @@ private function parseDsn($dsn)
return urldecode($value);
}, $config);

if (array_key_exists('qos_global', $config)) {
$config['qos_global'] = (bool) $config['qos_global'];
}
if (array_key_exists('qos_prefetch_count', $config)) {
$config['qos_prefetch_count'] = (int) $config['qos_prefetch_count'];
}
if (array_key_exists('qos_prefetch_size', $config)) {
$config['qos_prefetch_size'] = (int) $config['qos_prefetch_size'];
}
if (array_key_exists('lazy', $config)) {
$config['lazy'] = (bool) $config['lazy'];
}
if (array_key_exists('persisted', $config)) {
$config['persisted'] = (bool) $config['persisted'];
}

return $config;
}

Expand All @@ -202,8 +214,9 @@ private function defaultConfig()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
'receive_method' => 'basic_get',
];
}
Expand Down
59 changes: 35 additions & 24 deletions pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
'receive_method' => 'basic_get',
],
];
Expand All @@ -94,8 +95,9 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
'receive_method' => 'basic_get',
],
];
Expand All @@ -113,8 +115,9 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
'receive_method' => 'basic_get',
],
];
Expand All @@ -132,8 +135,9 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
'receive_method' => 'basic_get',
],
];
Expand All @@ -151,8 +155,9 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
'receive_method' => 'basic_get',
],
];
Expand All @@ -170,8 +175,9 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
'receive_method' => 'basic_get',
],
];
Expand All @@ -189,8 +195,9 @@ public static function provideConfigs()
'connect_timeout' => '2',
'persisted' => false,
'lazy' => '',
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
'receive_method' => 'basic_get',
],
];
Expand All @@ -208,8 +215,9 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
'receive_method' => 'basic_get',
],
];
Expand All @@ -227,14 +235,15 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => false,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
'receive_method' => 'basic_get',
],
];

yield [
['pre_fetch_count' => 123, 'pre_fetch_size' => 321],
['qos_prefetch_count' => 123, 'qos_prefetch_size' => 321],
[
'host' => 'localhost',
'port' => 5672,
Expand All @@ -246,14 +255,15 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => 123,
'pre_fetch_size' => 321,
'qos_prefetch_count' => 123,
'qos_prefetch_size' => 321,
'qos_global' => false,
'receive_method' => 'basic_get',
],
];

yield [
'amqp://user:pass@host:10000/vhost?pre_fetch_count=123&pre_fetch_size=321',
'amqp://user:pass@host:10000/vhost?qos_prefetch_count=123&qos_prefetch_size=321&qos_global=1',
[
'host' => 'host',
'port' => '10000',
Expand All @@ -265,8 +275,9 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => 123,
'pre_fetch_size' => 321,
'qos_prefetch_size' => 321,
'qos_prefetch_count' => 123,
'qos_global' => true,
'receive_method' => 'basic_get',
],
];
Expand Down
Loading