Skip to content

[redis] add dsn support for redis transport. #204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions docs/transport/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,26 @@ $ composer require enqueue/redis predis/predis:^1
<?php
use Enqueue\Redis\RedisConnectionFactory;

$connectionFactory = new RedisConnectionFactory([
'host' => 'localhost',
'port' => 6379,
// connects to localhost
$factory = new RedisConnectionFactory();

// same as above
$factory = new RedisConnectionFactory('redis:');

// same as above
$factory = new RedisConnectionFactory([]);

// connect to Redis at example.com port 1000 using phpredis extension
$factory = new RedisConnectionFactory([
'host' => 'example.com',
'port' => 1000,
'vendor' => 'phpredis',
]);

$psrContext = $connectionFactory->createContext();
// same as above but given as DSN string
$factory = new RedisConnectionFactory('redis://example.com:1000?vendor=phpredis');

$psrContext = $factory->createContext();
```

* With predis library:
Expand Down
9 changes: 6 additions & 3 deletions docs/transport/sqs.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ $ composer require enqueue/sqs
```php
<?php
use Enqueue\Sqs\SqsConnectionFactory;

$connectionFactory = new SqsConnectionFactory([
$factory = new SqsConnectionFactory([
'key' => 'aKey',
'secret' => 'aSecret',
'region' => 'aRegion',
]);

$psrContext = $connectionFactory->createContext();
// same as above but given as DSN string
$factory = new SqsConnectionFactory('sqs:?key=aKey&secret=aSecret&region=aRegion');

$psrContext = $factory->createContext();
```

## Declare queue.
Expand Down
25 changes: 18 additions & 7 deletions docs/transport/stomp.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,26 @@ $ composer require enqueue/stomp
<?php
use Enqueue\Stomp\StompConnectionFactory;

$connectionFactory = new StompConnectionFactory([
'host' => '127.0.0.1',
'port' => 61613,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/',
// connects to localhost
$factory = new StompConnectionFactory();

// same as above
$factory = new StompConnectionFactory('stomp:');

// same as above
$factory = new StompConnectionFactory([]);

// connect to stomp broker at example.com port 1000 using
$factory = new StompConnectionFactory([
'host' => 'example.com',
'port' => 1000,
'login' => 'theLogin',
]);

$psrContext = $connectionFactory->createContext();
// same as above but given as DSN string
$factory = new StompConnectionFactory('stomp://example.com:1000?login=theLogin');

$psrContext = $factory->createContext();
```

## Send message to topic
Expand Down
7 changes: 4 additions & 3 deletions pkg/amqp-bunny/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrate
*/
public function __construct($config = 'amqp://')
{
if (is_string($config) && 0 === strpos($config, 'amqp+bunny://')) {
$config = str_replace('amqp+bunny://', 'amqp://', $config);
if (is_string($config) && 0 === strpos($config, 'amqp+bunny:')) {
$config = str_replace('amqp+bunny:', 'amqp:', $config);
}

if (empty($config) || 'amqp://' === $config) {
// third argument is deprecated will be removed in 0.8
if (empty($config) || 'amqp:' === $config || 'amqp://' === $config) {
$config = [];
} elseif (is_string($config)) {
$config = $this->parseDsn($config);
Expand Down
17 changes: 17 additions & 0 deletions pkg/amqp-bunny/Tests/AmqpConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ public static function provideConfigs()

// some examples from Appendix A: Examples (https://www.rabbitmq.com/uri-spec.html)

yield [
'amqp+bunny:',
[
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'user' => 'guest',
'pass' => 'guest',
'receive_method' => 'basic_get',
'heartbeat' => 0,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
'lazy' => true,
],
];

yield [
'amqp+bunny://user:pass@host:10000/vhost',
[
Expand Down
7 changes: 4 additions & 3 deletions pkg/amqp-ext/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrate
*/
public function __construct($config = 'amqp://')
{
if (is_string($config) && 0 === strpos($config, 'amqp+ext://')) {
$config = str_replace('amqp+ext://', 'amqp://', $config);
if (is_string($config) && 0 === strpos($config, 'amqp+ext:')) {
$config = str_replace('amqp+ext:', 'amqp:', $config);
}

if (empty($config) || 'amqp://' === $config) {
// third argument is deprecated will be removed in 0.8
if (empty($config) || 'amqp:' === $config || 'amqp://' === $config) {
$config = [];
} elseif (is_string($config)) {
$config = $this->parseDsn($config);
Expand Down
19 changes: 19 additions & 0 deletions pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,25 @@ public static function provideConfigs()

// some examples from Appendix A: Examples (https://www.rabbitmq.com/uri-spec.html)

yield [
'amqp+ext:',
[
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'user' => 'guest',
'pass' => 'guest',
'read_timeout' => null,
'write_timeout' => null,
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
'receive_method' => 'basic_get',
],
];

yield [
'amqp+ext://user:pass@host:10000/vhost',
[
Expand Down
7 changes: 4 additions & 3 deletions pkg/amqp-lib/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrate
*/
public function __construct($config = 'amqp://')
{
if (is_string($config) && 0 === strpos($config, 'amqp+lib://')) {
$config = str_replace('amqp+lib://', 'amqp://', $config);
if (is_string($config) && 0 === strpos($config, 'amqp+lib:')) {
$config = str_replace('amqp+lib:', 'amqp:', $config);
}

if (empty($config) || 'amqp://' === $config) {
// third argument is deprecated will be removed in 0.8
if (empty($config) || 'amqp:' === $config || 'amqp://' === $config) {
$config = [];
} elseif (is_string($config)) {
$config = $this->parseDsn($config);
Expand Down
27 changes: 27 additions & 0 deletions pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,33 @@ public static function provideConfigs()

// some examples from Appendix A: Examples (https://www.rabbitmq.com/uri-spec.html)

yield [
'amqp+lib:',
[
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'user' => 'guest',
'pass' => 'guest',
'read_timeout' => 3,
'write_timeout' => 3,
'lazy' => true,
'receive_method' => 'basic_get',
'stream' => true,
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'keepalive' => false,
'heartbeat' => 0,
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'qos_prefetch_size' => 0,
'qos_prefetch_count' => 1,
'qos_global' => false,
],
];

yield [
'amqp+lib://user:pass@host:10000/vhost',
[
Expand Down
52 changes: 44 additions & 8 deletions pkg/enqueue/Symfony/DefaultTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,26 @@

namespace Enqueue\Symfony;

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\AmqpBunny\AmqpConnectionFactory as AmqpBunnyConnectionFactory;
use Enqueue\AmqpBunny\Symfony\AmqpBunnyTransportFactory;
use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnectionFactory;
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnectionFactory;
use Enqueue\AmqpLib\Symfony\AmqpLibTransportFactory;
use Enqueue\Dbal\DbalConnectionFactory;
use Enqueue\Dbal\Symfony\DbalTransportFactory;
use Enqueue\Fs\FsConnectionFactory;
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Gps\Symfony\GpsTransportFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Null\Symfony\NullTransportFactory;
use Enqueue\Redis\RedisConnectionFactory;
use Enqueue\Redis\Symfony\RedisTransportFactory;
use Enqueue\Sqs\SqsConnectionFactory;
use Enqueue\Sqs\Symfony\SqsTransportFactory;
use Enqueue\Stomp\StompConnectionFactory;
use Enqueue\Stomp\Symfony\StompTransportFactory;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use function Enqueue\dsn_to_connection_factory;
Expand Down Expand Up @@ -167,27 +179,51 @@ private function resolveDSN(ContainerBuilder $container, $dsn)
*/
private function findFactory($dsn)
{
$connectionFactory = dsn_to_connection_factory($dsn);
$factory = dsn_to_connection_factory($dsn);

if ($connectionFactory instanceof AmqpConnectionFactory) {
return new AmqpTransportFactory('default_amqp');
if ($factory instanceof AmqpExtConnectionFactory) {
return new AmqpTransportFactory('default_amqp_ext');
}

if ($connectionFactory instanceof FsConnectionFactory) {
if ($factory instanceof AmqpLibConnectionFactory) {
return new AmqpLibTransportFactory('default_amqp_lib');
}

if ($factory instanceof AmqpBunnyConnectionFactory) {
return new AmqpBunnyTransportFactory('default_amqp_bunny');
}

if ($factory instanceof FsConnectionFactory) {
return new FsTransportFactory('default_fs');
}

if ($connectionFactory instanceof DbalConnectionFactory) {
if ($factory instanceof DbalConnectionFactory) {
return new DbalTransportFactory('default_dbal');
}

if ($connectionFactory instanceof NullConnectionFactory) {
if ($factory instanceof NullConnectionFactory) {
return new NullTransportFactory('default_null');
}

if ($factory instanceof GpsConnectionFactory) {
return new GpsTransportFactory('default_gps');
}

if ($factory instanceof RedisConnectionFactory) {
return new RedisTransportFactory('default_redis');
}

if ($factory instanceof SqsConnectionFactory) {
return new SqsTransportFactory('default_sqs');
}

if ($factory instanceof StompConnectionFactory) {
return new StompTransportFactory('default_stomp');
}

throw new \LogicException(sprintf(
'There is no supported transport factory for the connection factory "%s" created from DSN "%s"',
get_class($connectionFactory),
get_class($factory),
$dsn
));
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
use Enqueue\Dbal\DbalConnectionFactory;
use Enqueue\Fs\FsConnectionFactory;
use Enqueue\Gearman\GearmanConnectionFactory;
use Enqueue\Gps\GpsConnectionFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Pheanstalk\PheanstalkConnectionFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Enqueue\Redis\RedisConnectionFactory;
use Enqueue\Sqs\SqsConnectionFactory;
use Enqueue\Stomp\StompConnectionFactory;
use PHPUnit\Framework\TestCase;

class DsnToConnectionFactoryFunctionTest extends TestCase
Expand Down Expand Up @@ -71,5 +75,13 @@ public static function provideDSNs()
// yield ['gearman://', GearmanConnectionFactory::class];

yield ['rdkafka://', RdKafkaConnectionFactory::class];

yield ['redis:', RedisConnectionFactory::class];

yield ['stomp:', StompConnectionFactory::class];

yield ['sqs:', SqsConnectionFactory::class];

yield ['gps:', GpsConnectionFactory::class];
}
}
12 changes: 12 additions & 0 deletions pkg/enqueue/Tests/Functions/DsnToContextFunctionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

use Enqueue\AmqpExt\AmqpContext;
use Enqueue\Fs\FsContext;
use Enqueue\Gps\GpsContext;
use Enqueue\Null\NullContext;
use Enqueue\Redis\RedisContext;
use Enqueue\Sqs\SqsContext;
use Enqueue\Stomp\StompContext;
use PHPUnit\Framework\TestCase;

class DsnToContextFunctionTest extends TestCase
Expand Down Expand Up @@ -57,5 +61,13 @@ public static function provideDSNs()
yield ['file://'.sys_get_temp_dir(), FsContext::class];

yield ['null://', NullContext::class];

yield ['redis:', RedisContext::class];

yield ['stomp:', StompContext::class];

yield ['sqs:', SqsContext::class];

yield ['gps:', GpsContext::class];
}
}
14 changes: 13 additions & 1 deletion pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,11 @@ public function testShouldCreateDriverFromDsn($dsn, $expectedName)

public static function provideDSNs()
{
yield ['amqp://', 'default_amqp'];
yield ['amqp+ext://', 'default_amqp_ext'];

yield ['amqp+lib:', 'default_amqp_lib'];

yield ['amqp+bunny://', 'default_amqp_bunny'];

yield ['null://', 'default_null'];

Expand All @@ -259,5 +263,13 @@ public static function provideDSNs()
yield ['mysql://', 'default_dbal'];

yield ['pgsql://', 'default_dbal'];

yield ['gps:', 'default_gps'];

yield ['sqs:', 'default_sqs'];

yield ['redis:', 'default_redis'];

yield ['stomp:', 'default_stomp'];
}
}
Loading