Skip to content

Redis transport. #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/test
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ function waitForService()

waitForService rabbitmq 5672 50
waitForService mysql 3306 50
waitForService redis 6379 50

php pkg/job-queue/Tests/Functional/app/console doctrine:database:create
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
Expand Down
11 changes: 8 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
"enqueue/enqueue": "*@dev",
"enqueue/stomp": "*@dev",
"enqueue/amqp-ext": "*@dev",
"enqueue/redis": "*@dev",
"enqueue/fs": "*@dev",
"enqueue/enqueue-bundle": "*@dev",
"enqueue/job-queue": "*@dev",
"enqueue/test": "*@dev"
},
"require-dev": {
"enqueue/test": "*@dev",

"phpunit/phpunit": "^5",
"doctrine/doctrine-bundle": "~1.2",
"predis/predis": "^1.1",
"symfony/monolog-bundle": "^2.8|^3",
"symfony/browser-kit": "^2.8|^3",
"symfony/expression-language": "^2.8|^3",
Expand Down Expand Up @@ -46,6 +47,10 @@
"type": "path",
"url": "pkg/amqp-ext"
},
{
"type": "path",
"url": "pkg/redis"
},
{
"type": "path",
"url": "pkg/enqueue-bundle"
Expand Down
10 changes: 9 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ services:
depends_on:
- rabbitmq
- mysql
- redis
volumes:
- ./:/mqdev
- './:/mqdev'
environment:
- SYMFONY__RABBITMQ__HOST=rabbitmq
- SYMFONY__RABBITMQ__USER=guest
Expand All @@ -21,6 +22,8 @@ services:
- SYMFONY__DB__NAME=mqdev
- SYMFONY__DB__USER=root
- SYMFONY__DB__PASSWORD=rootpass
- SYMFONY__REDIS__HOST=redis
- SYMFONY__REDIS__PORT=6379

rabbitmq:
image: enqueue/rabbitmq:latest
Expand All @@ -29,6 +32,11 @@ services:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
- RABBITMQ_DEFAULT_VHOST=mqdev
redis:
image: 'redis:3'
ports:
- "6379:6379"

mysql:
image: mariadb:10
volumes:
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ FROM ubuntu:16.04
RUN set -x && \
apt-get update && \
apt-get install -y --no-install-recommends wget curl openssl ca-certificates nano netcat && \
apt-get install -y --no-install-recommends php php-mysql php-curl php-intl php-mbstring php-zip php-mcrypt php-xdebug php-bcmath php-xml php-amqp
apt-get install -y --no-install-recommends php php-mysql php-redis php-curl php-intl php-mbstring php-zip php-mcrypt php-xdebug php-bcmath php-xml php-amqp

## confis

Expand Down
5 changes: 1 addition & 4 deletions docs/filesystem_transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
Use files on local filesystem as queues.
It creates a file per queue\topic.
A message is a line inside the file.
**Limitations** It works only in auto ack mode. Local by nature therefor messages are not visible on other servers.
**Limitations** It works only in auto ack mode hence If consumer crashes the message is lost. Local by nature therefor messages are not visible on other servers.

* [Installation](#installation)
* [Create context](#create-context)
* [Declare topic](#declare-topic)
* [Declare queue](#decalre-queue)
* [Bind queue to topic](#bind-queue-to-topic)
* [Send message to topic](#send-message-to-topic)
* [Send message to queue](#send-message-to-queue)
* [Consume message](#consume-message)
Expand Down
5 changes: 3 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

* [Quick tour](quick_tour.md)
* Transports
- [Amqp](amqp_transport.md)
- [Stomp](stomp_transport.md)
- [Amqp (RabbitMQ, ActiveMQ)](amqp_transport.md)
- [Stomp (RabbitMQ, ActiveMQ)](stomp_transport.md)
- [Redis](redis_transport.md)
- [Filesystem](filesystem_transport.md)
- [Null](null_transport.md)
* Consumption
Expand Down
125 changes: 125 additions & 0 deletions docs/redis_transport.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Redis transport

The transport uses [Redis](https://redis.io/) as a message broker.
It creates a collection (a queue or topic) there. Pushes messages to the tail of the collection and pops from the head.
The transport works with [phpredis](https://github.com/phpredis/phpredis) php extension or [predis](https://github.com/nrk/predis) library.
Make sure you installed either of them

**Limitations** It works only in auto ack mode hence If consumer crashes the message is lost.

* [Installation](#installation)
* [Create context](#create-context)
* [Send message to topic](#send-message-to-topic)
* [Send message to queue](#send-message-to-queue)
* [Consume message](#consume-message)
* [Delete queue (purge messages)](#delete-queue-purge-messages)
* [Delete topic (purge messages)](#delete-topic-purge-messages)

## Installation

* With php redis extension:

```bash
$ apt-get install php-redis
$ composer require enqueue/redis
```

* With predis library:

```bash
$ composer require enqueue/redis predis/predis:^1
```

## Create context

* With php redis extension:

```php
<?php
use Enqueue\Redis\RedisConnectionFactory;

$connectionFactory = new RedisConnectionFactory([
'host' => 'localhost',
'port' => 6379,
'vendor' => 'phpredis',
]);

$psrContext = $connectionFactory->createContext();
```

* With predis library:

```php
<?php
use Enqueue\Redis\RedisConnectionFactory;

$connectionFactory = new RedisConnectionFactory([
'host' => 'localhost',
'port' => 6379,
'vendor' => 'predis',
]);

$psrContext = $connectionFactory->createContext();
```

## Send message to topic

```php
<?php
/** @var \Enqueue\Redis\RedisContext $psrContext */

$fooTopic = $psrContext->createTopic('aTopic');
$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooTopic, $message);
```

## Send message to queue

```php
<?php
/** @var \Enqueue\Redis\RedisContext $psrContext */

$fooQueue = $psrContext->createQueue('aQueue');
$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooQueue, $message);
```

## Consume message:

```php
<?php
/** @var \Enqueue\Redis\RedisContext $psrContext */

$fooQueue = $psrContext->createQueue('aQueue');
$consumer = $psrContext->createConsumer($fooQueue);

$message = $consumer->receive();

// process a message
```

## Delete queue (purge messages):

```php
<?php
/** @var \Enqueue\Redis\RedisContext $psrContext */

$fooQueue = $psrContext->createQueue('aQueue');

$psrContext->deleteQueue($fooQueue);
```

## Delete topic (purge messages):

```php
<?php
/** @var \Enqueue\Redis\RedisContext $psrContext */

$fooTopic = $psrContext->createTopic('aTopic');

$psrContext->deleteTopic($fooTopic);
```

[back to index](index.md)
6 changes: 5 additions & 1 deletion phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
<directory>pkg/amqp-ext/Tests</directory>
</testsuite>

<testsuite name="fs">
<testsuite name="fs transport">
<directory>pkg/fs/Tests</directory>
</testsuite>

<testsuite name="redis transport">
<directory>pkg/redis/Tests</directory>
</testsuite>

<testsuite name="enqueue-bundle">
<directory>pkg/enqueue-bundle/Tests</directory>
</testsuite>
Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class AmqpConnectionFactory implements PsrConnectionFactory
* 'read_timeout' => Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
* 'write_timeout' => Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
* 'connect_timeout' => Connection timeout. Note: 0 or greater seconds. May be fractional.
* 'persisted' => bool
* 'persisted' => bool, Whether it use single persisted connection or open a new one for every context
* 'lazy' => the connection will be performed as later as possible, if the option set to true
* ].
*
* @param $config
Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
use Enqueue\AmqpExt\AmqpContext;
use Enqueue\Psr\PsrConnectionFactory;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;

class AmqpConnectionFactoryTest extends \PHPUnit_Framework_TestCase
class AmqpConnectionFactoryTest extends TestCase
{
use ClassExtensionTrait;

Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/AmqpConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
use Enqueue\AmqpExt\Buffer;
use Enqueue\Psr\PsrConsumer;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;

class AmqpConsumerTest extends \PHPUnit_Framework_TestCase
class AmqpConsumerTest extends TestCase
{
use ClassExtensionTrait;

Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/AmqpContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
use Enqueue\Test\ClassExtensionTrait;
use Enqueue\Transport\Null\NullQueue;
use Enqueue\Transport\Null\NullTopic;
use PHPUnit\Framework\TestCase;

class AmqpContextTest extends \PHPUnit_Framework_TestCase
class AmqpContextTest extends TestCase
{
use ClassExtensionTrait;

Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/AmqpMessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
use Enqueue\AmqpExt\AmqpMessage;
use Enqueue\Psr\PsrMessage;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;

class AmqpMessageTest extends \PHPUnit_Framework_TestCase
class AmqpMessageTest extends TestCase
{
use ClassExtensionTrait;

Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/AmqpProducerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
use Enqueue\AmqpExt\AmqpProducer;
use Enqueue\Psr\PsrProducer;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;

class AmqpProducerTest extends \PHPUnit_Framework_TestCase
class AmqpProducerTest extends TestCase
{
use ClassExtensionTrait;

Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/AmqpQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
use Enqueue\AmqpExt\AmqpQueue;
use Enqueue\Psr\PsrQueue;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;

class AmqpQueueTest extends \PHPUnit_Framework_TestCase
class AmqpQueueTest extends TestCase
{
use ClassExtensionTrait;

Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/AmqpTopicTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
use Enqueue\AmqpExt\AmqpTopic;
use Enqueue\Psr\PsrTopic;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;

class AmqpTopicTest extends \PHPUnit_Framework_TestCase
class AmqpTopicTest extends TestCase
{
use ClassExtensionTrait;

Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/BufferTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

use Enqueue\AmqpExt\AmqpMessage;
use Enqueue\AmqpExt\Buffer;
use PHPUnit\Framework\TestCase;

class BufferTest extends \PHPUnit_Framework_TestCase
class BufferTest extends TestCase
{
public function testCouldBeConstructedWithoutAnyArguments()
{
Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/Client/AmqpDriverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\Psr\PsrProducer;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;

class AmqpDriverTest extends \PHPUnit_Framework_TestCase
class AmqpDriverTest extends TestCase
{
use ClassExtensionTrait;

Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/Client/RabbitMqDriverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\Psr\PsrProducer;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;

class RabbitMqDriverTest extends \PHPUnit_Framework_TestCase
class RabbitMqDriverTest extends TestCase
{
use ClassExtensionTrait;

Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
use Enqueue\AmqpExt\AmqpMessage;
use Enqueue\Test\RabbitmqAmqpExtension;
use Enqueue\Test\RabbitmqManagmentExtensionTrait;
use PHPUnit\Framework\TestCase;

/**
* @group functional
*/
class AmqpCommonUseCasesTest extends \PHPUnit_Framework_TestCase
class AmqpCommonUseCasesTest extends TestCase
{
use RabbitmqAmqpExtension;
use RabbitmqManagmentExtensionTrait;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
use Enqueue\Psr\PsrProcessor;
use Enqueue\Test\RabbitmqAmqpExtension;
use Enqueue\Test\RabbitmqManagmentExtensionTrait;
use PHPUnit\Framework\TestCase;

/**
* @group functional
*/
class AmqpConsumptionUseCasesTest extends \PHPUnit_Framework_TestCase
class AmqpConsumptionUseCasesTest extends TestCase
{
use RabbitmqAmqpExtension;
use RabbitmqManagmentExtensionTrait;
Expand Down
Loading