Skip to content

[Amqp] Qos #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/amqp-ext/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ public function close()
}
}

/**
* {@inheritdoc}
*/
public function setQos($prefetchSize, $prefetchCount, $global)
{
$this->getExtChannel()->qos($prefetchSize, $prefetchCount);
}

/**
* @return \AMQPChannel
*/
Expand Down
8 changes: 7 additions & 1 deletion pkg/amqp-lib/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory
* 'lazy' => 'the connection will be performed as later as possible, if the option set to true',
* 'stream' => 'stream or socket connection',
* 'receive_method' => 'Could be either basic_get or basic_consume',
* 'qos_prefetch_size' => 'The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"',
* 'qos_prefetch_count' => 'Specifies a prefetch window in terms of whole messages.',
* 'qos_global' => 'If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.',
* ]
*
* or
Expand Down Expand Up @@ -69,7 +72,7 @@ public function __construct($config = 'amqp://')
*/
public function createContext()
{
return new AmqpContext($this->establishConnection(), $this->config['receive_method']);
return new AmqpContext($this->establishConnection(), $this->config);
}

/**
Expand Down Expand Up @@ -224,6 +227,9 @@ private function defaultConfig()
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'receive_method' => 'basic_get',
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
];
}
}
32 changes: 25 additions & 7 deletions pkg/amqp-lib/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class AmqpContext implements InteropAmqpContext
/**
* @var string
*/
private $receiveMethod;
private $config;

/**
* @var Buffer
Expand All @@ -42,12 +42,18 @@ class AmqpContext implements InteropAmqpContext

/**
* @param AbstractConnection $connection
* @param string $receiveMethod
* @param array $config
*/
public function __construct(AbstractConnection $connection, $receiveMethod)
public function __construct(AbstractConnection $connection, $config = [])
{
$this->config = array_replace([
'receive_method' => 'basic_get',
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
], $config);

$this->connection = $connection;
$this->receiveMethod = $receiveMethod;
$this->buffer = new Buffer();
}

Expand Down Expand Up @@ -99,10 +105,10 @@ public function createConsumer(PsrDestination $destination)
$queue = $this->createTemporaryQueue();
$this->bind(new AmqpBind($destination, $queue, $queue->getQueueName()));

return new AmqpConsumer($this->getChannel(), $queue, $this->buffer, $this->receiveMethod);
return new AmqpConsumer($this->getChannel(), $queue, $this->buffer, $this->config['receive_method']);
}

return new AmqpConsumer($this->getChannel(), $destination, $this->buffer, $this->receiveMethod);
return new AmqpConsumer($this->getChannel(), $destination, $this->buffer, $this->config['receive_method']);
}

/**
Expand Down Expand Up @@ -278,14 +284,26 @@ public function close()
}
}

/**
* {@inheritdoc}
*/
public function setQos($prefetchSize, $prefetchCount, $global)
{
$this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
}

/**
* @return AMQPChannel
*/
private function getChannel()
{
if (null === $this->channel) {
$this->channel = $this->connection->channel();
$this->channel->basic_qos(0, 1, false);
$this->channel->basic_qos(
$this->config['qos_prefetch_size'],
$this->config['qos_prefetch_count'],
$this->config['qos_global']
);
}

return $this->channel;
Expand Down
27 changes: 27 additions & 0 deletions pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public static function provideConfigs()
'heartbeat' => 0,
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
],
];

Expand All @@ -107,6 +110,9 @@ public static function provideConfigs()
'heartbeat' => 0,
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
],
];

Expand All @@ -131,6 +137,9 @@ public static function provideConfigs()
'heartbeat' => 0,
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
],
];

Expand All @@ -155,6 +164,9 @@ public static function provideConfigs()
'heartbeat' => 0,
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
],
];

Expand All @@ -179,6 +191,9 @@ public static function provideConfigs()
'heartbeat' => 0,
'connection_timeout' => '2',
'read_write_timeout' => 3.0,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
],
];

Expand All @@ -203,6 +218,9 @@ public static function provideConfigs()
'heartbeat' => 0,
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
],
];

Expand All @@ -227,6 +245,9 @@ public static function provideConfigs()
'heartbeat' => 0,
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
],
];

Expand All @@ -251,6 +272,9 @@ public static function provideConfigs()
'heartbeat' => 0,
'connection_timeout' => 123,
'read_write_timeout' => 321,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
],
];

Expand All @@ -275,6 +299,9 @@ public static function provideConfigs()
'heartbeat' => 0,
'connection_timeout' => '123',
'read_write_timeout' => '321',
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
],
];
}
Expand Down
45 changes: 35 additions & 10 deletions pkg/amqp-lib/Tests/AmqpContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public function testShouldDeclareTopic()
$topic->addFlag(AmqpTopic::FLAG_INTERNAL);
$topic->addFlag(AmqpTopic::FLAG_AUTODELETE);

$session = new AmqpContext($connection, '');
$session = new AmqpContext($connection);
$session->declareTopic($topic);
}

Expand Down Expand Up @@ -77,7 +77,7 @@ public function testShouldDeleteTopic()
$topic->addFlag(AmqpTopic::FLAG_IFUNUSED);
$topic->addFlag(AmqpTopic::FLAG_NOWAIT);

$session = new AmqpContext($connection, '');
$session = new AmqpContext($connection);
$session->deleteTopic($topic);
}

Expand Down Expand Up @@ -115,7 +115,7 @@ public function testShouldDeclareQueue()
$queue->addFlag(AmqpQueue::FLAG_EXCLUSIVE);
$queue->addFlag(AmqpQueue::FLAG_NOWAIT);

$session = new AmqpContext($connection, '');
$session = new AmqpContext($connection);
$session->declareQueue($queue);
}

Expand Down Expand Up @@ -146,7 +146,7 @@ public function testShouldDeleteQueue()
$queue->addFlag(AmqpQueue::FLAG_IFEMPTY);
$queue->addFlag(AmqpQueue::FLAG_NOWAIT);

$session = new AmqpContext($connection, '');
$session = new AmqpContext($connection);
$session->deleteQueue($queue);
}

Expand All @@ -169,7 +169,7 @@ public function testBindShouldBindTopicToTopic()
->willReturn($channel)
;

$context = new AmqpContext($connection, '');
$context = new AmqpContext($connection);
$context->bind(new AmqpBind($target, $source, 'routing-key', 12345));
}

Expand All @@ -192,7 +192,7 @@ public function testBindShouldBindTopicToQueue()
->willReturn($channel)
;

$context = new AmqpContext($connection, '');
$context = new AmqpContext($connection);
$context->bind(new AmqpBind($target, $source, 'routing-key', 12345));
$context->bind(new AmqpBind($source, $target, 'routing-key', 12345));
}
Expand All @@ -216,7 +216,7 @@ public function testShouldUnBindTopicFromTopic()
->willReturn($channel)
;

$context = new AmqpContext($connection, '');
$context = new AmqpContext($connection);
$context->unbind(new AmqpBind($target, $source, 'routing-key', 12345));
}

Expand All @@ -239,7 +239,7 @@ public function testShouldUnBindTopicFromQueue()
->willReturn($channel)
;

$context = new AmqpContext($connection, '');
$context = new AmqpContext($connection);
$context->unbind(new AmqpBind($target, $source, 'routing-key', 12345, ['key' => 'value']));
$context->unbind(new AmqpBind($source, $target, 'routing-key', 12345, ['key' => 'value']));
}
Expand All @@ -259,7 +259,7 @@ public function testShouldCloseChannelConnection()
->willReturn($channel)
;

$context = new AmqpContext($connection, '');
$context = new AmqpContext($connection);
$context->createProducer();

$context->close();
Expand All @@ -284,10 +284,35 @@ public function testShouldPurgeQueue()
->willReturn($channel)
;

$context = new AmqpContext($connection, '');
$context = new AmqpContext($connection);
$context->purgeQueue($queue);
}

public function testShouldSetQos()
{
$channel = $this->createChannelMock();
$channel
->expects($this->at(0))
->method('basic_qos')
->with($this->identicalTo(0), $this->identicalTo(1), $this->isFalse())
;
$channel
->expects($this->at(1))
->method('basic_qos')
->with($this->identicalTo(123), $this->identicalTo(456), $this->isTrue())
;

$connection = $this->createConnectionMock();
$connection
->expects($this->once())
->method('channel')
->willReturn($channel)
;

$context = new AmqpContext($connection);
$context->setQos(123, 456, true);
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|AbstractConnection
*/
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp-lib/Tests/Spec/AmqpContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ protected function createContext()
->willReturn($channel)
;

return new AmqpContext($con, '');
return new AmqpContext($con);
}
}