Skip to content

Delay Strategy Configuration #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Aug 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b75d2ce
delay strategy
ASKozienko Aug 9, 2017
0405f4c
delay strategy
ASKozienko Aug 9, 2017
2a9550d
delay strategy
ASKozienko Aug 9, 2017
6d70adf
delay strategy
ASKozienko Aug 9, 2017
be3b5dd
delay strategy
ASKozienko Aug 9, 2017
2ac9251
delay strategy
ASKozienko Aug 9, 2017
c4153cb
delay strategy
ASKozienko Aug 9, 2017
b38f0bc
delay strategy
ASKozienko Aug 9, 2017
dbcfce6
delay strategy
ASKozienko Aug 9, 2017
7ddc44e
delay strategy
ASKozienko Aug 9, 2017
3a03b73
delay strategy
ASKozienko Aug 9, 2017
a05806f
delay strategy
ASKozienko Aug 9, 2017
8d6cb77
Merge branch 'master' into delay_driver
makasim Aug 9, 2017
4853455
[sqs] fix hanged tests.
makasim Aug 9, 2017
4e3de14
[sqs] fix hanged tests.
ASKozienko Aug 10, 2017
e178d9e
fix tests
ASKozienko Aug 10, 2017
ec620dd
fix tests
ASKozienko Aug 10, 2017
142a69c
fix tests
ASKozienko Aug 10, 2017
fa8da15
fix tests
ASKozienko Aug 10, 2017
0034d79
fix tests
ASKozienko Aug 10, 2017
759a908
fix tests
ASKozienko Aug 10, 2017
405c12d
fix tests
ASKozienko Aug 10, 2017
a5f3844
fix tests
ASKozienko Aug 10, 2017
1d34254
fix tests
ASKozienko Aug 10, 2017
fe6fc1c
fix tests
ASKozienko Aug 10, 2017
a3ff6f8
fix tests
ASKozienko Aug 10, 2017
064399e
fix tests
ASKozienko Aug 10, 2017
2e447cc
fix tests
ASKozienko Aug 10, 2017
5b23f86
fix tests
ASKozienko Aug 10, 2017
91a68e7
fix tests
ASKozienko Aug 10, 2017
383a934
fix tests
ASKozienko Aug 10, 2017
b3139cd
fix tests
ASKozienko Aug 10, 2017
07eeb44
tests
ASKozienko Aug 10, 2017
237c10e
tests
ASKozienko Aug 10, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
<directory>pkg/amqp-bunny/Tests</directory>
</testsuite>

<testsuite name="amqp-lib transport">
<directory>pkg/amqp-lib/Tests</directory>
</testsuite>

<testsuite name="amqp-tools">
<directory>pkg/amqp-tools/Tests</directory>
</testsuite>

<testsuite name="fs transport">
<directory>pkg/fs/Tests</directory>
</testsuite>
Expand Down
21 changes: 18 additions & 3 deletions pkg/amqp-bunny/Symfony/RabbitMqAmqpBunnyTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Enqueue\AmqpBunny\Symfony;

use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait;
use Enqueue\Client\Amqp\RabbitMqDriver;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand All @@ -10,6 +11,8 @@

class RabbitMqAmqpBunnyTransportFactory extends AmqpBunnyTransportFactory
{
use DelayStrategyTransportFactoryTrait;

/**
* @param string $name
*/
Expand All @@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder)

$builder
->children()
->booleanNode('delay_plugin_installed')
->defaultFalse()
->info('The option tells whether RabbitMQ broker has delay plugin installed or not')
->scalarNode('delay_strategy')
->defaultValue('dlx')
->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id')
->end()
;
}

/**
* {@inheritdoc}
*/
public function createConnectionFactory(ContainerBuilder $container, array $config)
{
$factoryId = parent::createConnectionFactory($container, $config);

$this->registerDelayStrategy($container, $config, $factoryId, $this->getName());

return $factoryId;
}

/**
* {@inheritdoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public function testShouldAllowAddConfiguration()
'user' => 'guest',
'pass' => 'guest',
'vhost' => '/',
'delay_plugin_installed' => false,
'delay_strategy' => 'dlx',
'lazy' => true,
'receive_method' => 'basic_get',
'heartbeat' => 0,
Expand All @@ -78,7 +78,7 @@ public function testShouldCreateConnectionFactory()
'pass' => 'guest',
'vhost' => '/',
'persisted' => false,
'delay_plugin_installed' => false,
'delay_strategy' => null,
]);

$this->assertTrue($container->hasDefinition($serviceId));
Expand All @@ -91,7 +91,7 @@ public function testShouldCreateConnectionFactory()
'pass' => 'guest',
'vhost' => '/',
'persisted' => false,
'delay_plugin_installed' => false,
'delay_strategy' => null,
]], $factory->getArguments());
}

Expand All @@ -108,7 +108,7 @@ public function testShouldCreateContext()
'pass' => 'guest',
'vhost' => '/',
'persisted' => false,
'delay_plugin_installed' => false,
'delay_strategy' => null,
]);

$this->assertEquals('enqueue.transport.rabbitmq_amqp_bunny.context', $serviceId);
Expand Down
21 changes: 18 additions & 3 deletions pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Enqueue\AmqpExt\Symfony;

use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait;
use Enqueue\Client\Amqp\RabbitMqDriver;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand All @@ -10,6 +11,8 @@

class RabbitMqAmqpTransportFactory extends AmqpTransportFactory
{
use DelayStrategyTransportFactoryTrait;

/**
* @param string $name
*/
Expand All @@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder)

$builder
->children()
->booleanNode('delay_plugin_installed')
->defaultFalse()
->info('The option tells whether RabbitMQ broker has delay plugin installed or not')
->scalarNode('delay_strategy')
->defaultValue('dlx')
->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id')
->end()
;
}

/**
* {@inheritdoc}
*/
public function createConnectionFactory(ContainerBuilder $container, array $config)
{
$factoryId = parent::createConnectionFactory($container, $config);

$this->registerDelayStrategy($container, $config, $factoryId, $this->getName());

return $factoryId;
}

/**
* {@inheritdoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public function testShouldAllowAddConfiguration()
'pass' => 'guest',
'vhost' => '/',
'persisted' => false,
'delay_plugin_installed' => false,
'delay_strategy' => 'dlx',
'lazy' => true,
'receive_method' => 'basic_get',
], $config);
Expand All @@ -78,7 +78,7 @@ public function testShouldCreateConnectionFactory()
'pass' => 'guest',
'vhost' => '/',
'persisted' => false,
'delay_plugin_installed' => false,
'delay_strategy' => null,
]);

$this->assertTrue($container->hasDefinition($serviceId));
Expand All @@ -91,7 +91,7 @@ public function testShouldCreateConnectionFactory()
'pass' => 'guest',
'vhost' => '/',
'persisted' => false,
'delay_plugin_installed' => false,
'delay_strategy' => null,
]], $factory->getArguments());
}

Expand All @@ -108,7 +108,7 @@ public function testShouldCreateContext()
'pass' => 'guest',
'vhost' => '/',
'persisted' => false,
'delay_plugin_installed' => false,
'delay_strategy' => null,
]);

$this->assertEquals('enqueue.transport.rabbitmq_amqp.context', $serviceId);
Expand Down
21 changes: 18 additions & 3 deletions pkg/amqp-lib/Symfony/RabbitMqAmqpLibTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Enqueue\AmqpLib\Symfony;

use Enqueue\AmqpTools\DelayStrategyTransportFactoryTrait;
use Enqueue\Client\Amqp\RabbitMqDriver;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand All @@ -10,6 +11,8 @@

class RabbitMqAmqpLibTransportFactory extends AmqpLibTransportFactory
{
use DelayStrategyTransportFactoryTrait;

/**
* @param string $name
*/
Expand All @@ -27,13 +30,25 @@ public function addConfiguration(ArrayNodeDefinition $builder)

$builder
->children()
->booleanNode('delay_plugin_installed')
->defaultFalse()
->info('The option tells whether RabbitMQ broker has delay plugin installed or not')
->scalarNode('delay_strategy')
->defaultValue('dlx')
->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id')
->end()
;
}

/**
* {@inheritdoc}
*/
public function createConnectionFactory(ContainerBuilder $container, array $config)
{
$factoryId = parent::createConnectionFactory($container, $config);

$this->registerDelayStrategy($container, $config, $factoryId, $this->getName());

return $factoryId;
}

/**
* {@inheritdoc}
*/
Expand Down
4 changes: 4 additions & 0 deletions pkg/amqp-lib/Tests/AmqpConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public function testShouldReturnMessageOnReceiveNoWait()
$amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body');
$amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag';
$amqpMessage->delivery_info['redelivered'] = true;
$amqpMessage->delivery_info['routing_key'] = 'routing-key';

$channel = $this->createChannelMock();
$channel
Expand All @@ -120,13 +121,15 @@ public function testShouldReturnMessageOnReceiveNoWait()
$this->assertInstanceOf(AmqpMessage::class, $message);
$this->assertSame('body', $message->getBody());
$this->assertSame('delivery-tag', $message->getDeliveryTag());
$this->assertSame('routing-key', $message->getRoutingKey());
$this->assertTrue($message->isRedelivered());
}

public function testShouldReturnMessageOnReceiveWithReceiveMethodBasicGet()
{
$amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body');
$amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag';
$amqpMessage->delivery_info['routing_key'] = 'routing-key';
$amqpMessage->delivery_info['redelivered'] = true;

$channel = $this->createChannelMock();
Expand All @@ -146,6 +149,7 @@ public function testShouldReturnMessageOnReceiveWithReceiveMethodBasicGet()
$this->assertInstanceOf(AmqpMessage::class, $message);
$this->assertSame('body', $message->getBody());
$this->assertSame('delivery-tag', $message->getDeliveryTag());
$this->assertSame('routing-key', $message->getRoutingKey());
$this->assertTrue($message->isRedelivered());
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/amqp-lib/Tests/AmqpContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Interop\Amqp\Impl\AmqpTopic;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Wire\AMQPTable;
use PHPUnit\Framework\TestCase;

class AmqpContextTest extends TestCase
Expand All @@ -26,7 +27,7 @@ public function testShouldDeclareTopic()
$this->isTrue(),
$this->isTrue(),
$this->isTrue(),
$this->identicalTo(['key' => 'value']),
$this->isInstanceOf(AMQPTable::class),
$this->isNull()
)
;
Expand Down Expand Up @@ -94,7 +95,7 @@ public function testShouldDeclareQueue()
$this->isTrue(),
$this->isTrue(),
$this->isTrue(),
$this->identicalTo(['key' => 'value']),
$this->isInstanceOf(AMQPTable::class),
$this->isNull()
)
;
Expand Down
23 changes: 16 additions & 7 deletions pkg/amqp-lib/Tests/AmqpProducerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Enqueue\AmqpLib\Tests;

use Enqueue\AmqpLib\AmqpContext;
use Enqueue\AmqpLib\AmqpProducer;
use Enqueue\Test\ClassExtensionTrait;
use Interop\Amqp\Impl\AmqpMessage;
Expand All @@ -23,7 +24,7 @@ class AmqpProducerTest extends TestCase

public function testCouldBeConstructedWithRequiredArguments()
{
new AmqpProducer($this->createAmqpChannelMock());
new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock());
}

public function testShouldImplementPsrProducerInterface()
Expand All @@ -33,7 +34,7 @@ public function testShouldImplementPsrProducerInterface()

public function testShouldThrowExceptionWhenDestinationTypeIsInvalid()
{
$producer = new AmqpProducer($this->createAmqpChannelMock());
$producer = new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock());

$this->expectException(InvalidDestinationException::class);
$this->expectExceptionMessage('The destination must be an instance of Interop\Amqp\AmqpQueue but got');
Expand All @@ -43,7 +44,7 @@ public function testShouldThrowExceptionWhenDestinationTypeIsInvalid()

public function testShouldThrowExceptionWhenMessageTypeIsInvalid()
{
$producer = new AmqpProducer($this->createAmqpChannelMock());
$producer = new AmqpProducer($this->createAmqpChannelMock(), $this->createContextMock());

$this->expectException(InvalidMessageException::class);
$this->expectExceptionMessage('The message must be an instance of Interop\Amqp\AmqpMessage but it is');
Expand All @@ -70,7 +71,7 @@ public function testShouldPublishMessageToTopic()
$message = new AmqpMessage('body');
$message->setRoutingKey('routing-key');

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send($topic, $message);

$this->assertEquals('body', $amqpMessage->getBody());
Expand All @@ -92,7 +93,7 @@ public function testShouldPublishMessageToQueue()

$queue = new AmqpQueue('queue');

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send($queue, new AmqpMessage('body'));

$this->assertEquals('body', $amqpMessage->getBody());
Expand All @@ -111,7 +112,7 @@ public function testShouldSetMessageHeaders()
}))
;

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send(new AmqpTopic('name'), new AmqpMessage('body', [], ['content_type' => 'text/plain']));

$this->assertEquals(['content_type' => 'text/plain'], $amqpMessage->get_properties());
Expand All @@ -130,7 +131,7 @@ public function testShouldSetMessageProperties()
}))
;

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send(new AmqpTopic('name'), new AmqpMessage('body', ['key' => 'value']));

$properties = $amqpMessage->get_properties();
Expand Down Expand Up @@ -163,4 +164,12 @@ private function createAmqpChannelMock()
{
return $this->createMock(AMQPChannel::class);
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext
*/
private function createContextMock()
{
return $this->createMock(AmqpContext::class);
}
}
Loading