Skip to content

Commit 7aa1c68

Browse files
authored
Merge pull request #54 from php-enqueue/dbal-transport
DBAL Transport
2 parents dd989b3 + abcf56c commit 7aa1c68

32 files changed

+3049
-1
lines changed

bin/subtree-split

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ remote stomp git@github.com:php-enqueue/stomp.git
4949
remote amqp-ext git@github.com:php-enqueue/amqp-ext.git
5050
remote fs git@github.com:php-enqueue/fs.git
5151
remote redis git@github.com:php-enqueue/redis.git
52+
remote dbal git@github.com:php-enqueue/dbal.git
5253
remote enqueue-bundle git@github.com:php-enqueue/enqueue-bundle.git
5354
remote job-queue git@github.com:php-enqueue/job-queue.git
5455
remote test git@github.com:php-enqueue/test.git
@@ -59,6 +60,7 @@ split 'pkg/stomp' stomp
5960
split 'pkg/amqp-ext' amqp-ext
6061
split 'pkg/fs' fs
6162
split 'pkg/redis' redis
63+
split 'pkg/dbal' dbal
6264
split 'pkg/enqueue-bundle' enqueue-bundle
6365
split 'pkg/job-queue' job-queue
6466
split 'pkg/test' test

composer.json

+5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
"enqueue/amqp-ext": "*@dev",
1111
"enqueue/redis": "*@dev",
1212
"enqueue/fs": "*@dev",
13+
"enqueue/dbal": "*@dev",
1314
"enqueue/enqueue-bundle": "*@dev",
1415
"enqueue/job-queue": "*@dev",
1516
"enqueue/test": "*@dev",
@@ -62,6 +63,10 @@
6263
{
6364
"type": "path",
6465
"url": "pkg/fs"
66+
},
67+
{
68+
"type": "path",
69+
"url": "pkg/dbal"
6570
}
6671
]
6772
}

docs/dbal_transport.md

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Doctrine DBAL transport
2+
3+
The transport uses [Doctrine DBAL](http://docs.doctrine-project.org/projects/doctrine-dbal/en/latest/) library and SQL like server as a broker.
4+
It creates a table there. Pushes and pops messages to\from that table.
5+
6+
**Limitations** It works only in auto ack mode hence If consumer crashes the message is lost.
7+
8+
* [Installation](#installation)
9+
* [Init database](#init-database)
10+
* [Create context](#create-context)
11+
* [Send message to topic](#send-message-to-topic)
12+
* [Send message to queue](#send-message-to-queue)
13+
* [Consume message](#consume-message)
14+
15+
## Installation
16+
17+
```bash
18+
$ composer require enqueue/dbal
19+
```
20+
21+
## Create context
22+
23+
* With config (a connection is created internally):
24+
25+
```php
26+
<?php
27+
use Enqueue\Dbal\DbalConnectionFactory;
28+
29+
$factory = new DbalConnectionFactory([
30+
'connection' => [
31+
'dbname' => 'mqdev',
32+
'user' => 'user',
33+
'password' => 'pass',
34+
'host' => 'localhost',
35+
'port' => 3306,
36+
'driver' => 'pdo_mysql',
37+
],
38+
'table_name' => 'enqueue',
39+
]);
40+
41+
$psrContext = $factory->createContext();
42+
```
43+
44+
* With existing connection:
45+
46+
```php
47+
<?php
48+
use Enqueue\Dbal\ManagerRegistryConnectionFactory;
49+
use Doctrine\Common\Persistence\ManagerRegistry;
50+
51+
/** @var ManagerRegistry $registry */
52+
53+
$factory = new ManagerRegistryConnectionFactory($registry, [
54+
'connection_name' => 'default',
55+
]);
56+
57+
$psrContext = $factory->createContext();
58+
```
59+
60+
## Init database
61+
62+
At first time you have to create a table where your message will live. There is a handy methods for this `createDataBaseTable` on the context.
63+
Please pay attention to that the database has to be created manually.
64+
65+
```php
66+
<?php
67+
/** @var \Enqueue\Dbal\DbalContext $psrContext */
68+
69+
$psrContext->createDataBaseTable();
70+
```
71+
72+
## Send message to topic
73+
74+
```php
75+
<?php
76+
/** @var \Enqueue\Dbal\DbalContext $psrContext */
77+
78+
$fooTopic = $psrContext->createTopic('aTopic');
79+
$message = $psrContext->createMessage('Hello world!');
80+
81+
$psrContext->createProducer()->send($fooTopic, $message);
82+
```
83+
84+
## Send message to queue
85+
86+
```php
87+
<?php
88+
/** @var \Enqueue\Dbal\DbalContext $psrContext */
89+
90+
$fooQueue = $psrContext->createQueue('aQueue');
91+
$message = $psrContext->createMessage('Hello world!');
92+
93+
$psrContext->createProducer()->send($fooQueue, $message);
94+
```
95+
96+
## Consume message:
97+
98+
```php
99+
<?php
100+
/** @var \Enqueue\Dbal\DbalContext $psrContext */
101+
102+
$fooQueue = $psrContext->createQueue('aQueue');
103+
$consumer = $psrContext->createConsumer($fooQueue);
104+
105+
$message = $consumer->receive();
106+
107+
// process a message
108+
```
109+
110+
[back to index](index.md)

phpunit.xml.dist

+4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@
3737
<directory>pkg/redis/Tests</directory>
3838
</testsuite>
3939

40+
<testsuite name="dbal transport">
41+
<directory>pkg/dbal/Tests</directory>
42+
</testsuite>
43+
4044
<testsuite name="enqueue-bundle">
4145
<directory>pkg/enqueue-bundle/Tests</directory>
4246
</testsuite>

pkg/dbal/.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/

pkg/dbal/.travis.yml

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
sudo: false
2+
3+
git:
4+
depth: 1
5+
6+
language: php
7+
8+
php:
9+
- '5.6'
10+
- '7.0'
11+
12+
cache:
13+
directories:
14+
- $HOME/.composer/cache
15+
16+
install:
17+
- composer self-update
18+
- composer install --prefer-source
19+
20+
script:
21+
- vendor/bin/phpunit --exclude-group=functional

pkg/dbal/Client/DbalDriver.php

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
<?php
2+
namespace Enqueue\Dbal\Client;
3+
4+
use Enqueue\Client\Config;
5+
use Enqueue\Client\DriverInterface;
6+
use Enqueue\Client\Message;
7+
use Enqueue\Client\MessagePriority;
8+
use Enqueue\Dbal\DbalContext;
9+
use Enqueue\Dbal\DbalMessage;
10+
use Enqueue\Psr\PsrMessage;
11+
use Psr\Log\LoggerInterface;
12+
use Psr\Log\NullLogger;
13+
14+
class DbalDriver implements DriverInterface
15+
{
16+
/**
17+
* @var DbalContext
18+
*/
19+
private $context;
20+
21+
/**
22+
* @var Config
23+
*/
24+
private $config;
25+
26+
/**
27+
* @param DbalContext $context
28+
* @param Config $config
29+
*/
30+
public function __construct(DbalContext $context, Config $config)
31+
{
32+
$this->context = $context;
33+
$this->config = $config;
34+
}
35+
36+
/**
37+
* {@inheritdoc}
38+
*
39+
* @return DbalMessage
40+
*/
41+
public function createTransportMessage(Message $message)
42+
{
43+
$properties = $message->getProperties();
44+
45+
$headers = $message->getHeaders();
46+
$headers['content_type'] = $message->getContentType();
47+
48+
$transportMessage = $this->context->createMessage();
49+
$transportMessage->setBody($message->getBody());
50+
$transportMessage->setHeaders($headers);
51+
$transportMessage->setProperties($properties);
52+
$transportMessage->setMessageId($message->getMessageId());
53+
$transportMessage->setTimestamp($message->getTimestamp());
54+
$transportMessage->setDelay($message->getDelay());
55+
$transportMessage->setReplyTo($message->getReplyTo());
56+
$transportMessage->setCorrelationId($message->getCorrelationId());
57+
58+
return $transportMessage;
59+
}
60+
61+
/**
62+
* @param DbalMessage $message
63+
*
64+
* {@inheritdoc}
65+
*/
66+
public function createClientMessage(PsrMessage $message)
67+
{
68+
$clientMessage = new Message();
69+
70+
$clientMessage->setBody($message->getBody());
71+
$clientMessage->setHeaders($message->getHeaders());
72+
$clientMessage->setProperties($message->getProperties());
73+
74+
$clientMessage->setContentType($message->getHeader('content_type'));
75+
$clientMessage->setMessageId($message->getMessageId());
76+
$clientMessage->setTimestamp($message->getTimestamp());
77+
$clientMessage->setPriority(MessagePriority::NORMAL);
78+
$clientMessage->setDelay($message->getDelay());
79+
$clientMessage->setReplyTo($message->getReplyTo());
80+
$clientMessage->setCorrelationId($message->getCorrelationId());
81+
82+
return $clientMessage;
83+
}
84+
85+
/**
86+
* {@inheritdoc}
87+
*/
88+
public function sendToRouter(Message $message)
89+
{
90+
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
91+
throw new \LogicException('Topic name parameter is required but is not set');
92+
}
93+
94+
$queue = $this->createQueue($this->config->getRouterQueueName());
95+
$transportMessage = $this->createTransportMessage($message);
96+
97+
$this->context->createProducer()->send($queue, $transportMessage);
98+
}
99+
100+
/**
101+
* {@inheritdoc}
102+
*/
103+
public function sendToProcessor(Message $message)
104+
{
105+
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
106+
throw new \LogicException('Processor name parameter is required but is not set');
107+
}
108+
109+
if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
110+
throw new \LogicException('Queue name parameter is required but is not set');
111+
}
112+
113+
$transportMessage = $this->createTransportMessage($message);
114+
$destination = $this->createQueue($queueName);
115+
116+
$this->context->createProducer()->send($destination, $transportMessage);
117+
}
118+
119+
/**
120+
* {@inheritdoc}
121+
*/
122+
public function createQueue($queueName)
123+
{
124+
return $this->context->createQueue($this->config->createTransportQueueName($queueName));
125+
}
126+
127+
/**
128+
* {@inheritdoc}
129+
*/
130+
public function setupBroker(LoggerInterface $logger = null)
131+
{
132+
$logger = $logger ?: new NullLogger();
133+
$log = function ($text, ...$args) use ($logger) {
134+
$logger->debug(sprintf('[DbalDriver] '.$text, ...$args));
135+
};
136+
137+
$log('Creating database table: "%s"', $this->context->getTableName());
138+
$this->context->createDataBaseTable();
139+
}
140+
141+
/**
142+
* {@inheritdoc}
143+
*/
144+
public function getConfig()
145+
{
146+
return $this->config;
147+
}
148+
}

0 commit comments

Comments
 (0)