diff --git a/bin/subtree-split b/bin/subtree-split
index 63e6168a3..51bc06926 100755
--- a/bin/subtree-split
+++ b/bin/subtree-split
@@ -49,6 +49,7 @@ remote stomp git@github.com:php-enqueue/stomp.git
remote amqp-ext git@github.com:php-enqueue/amqp-ext.git
remote fs git@github.com:php-enqueue/fs.git
remote redis git@github.com:php-enqueue/redis.git
+remote dbal git@github.com:php-enqueue/dbal.git
remote enqueue-bundle git@github.com:php-enqueue/enqueue-bundle.git
remote job-queue git@github.com:php-enqueue/job-queue.git
remote test git@github.com:php-enqueue/test.git
@@ -59,6 +60,7 @@ split 'pkg/stomp' stomp
split 'pkg/amqp-ext' amqp-ext
split 'pkg/fs' fs
split 'pkg/redis' redis
+split 'pkg/dbal' dbal
split 'pkg/enqueue-bundle' enqueue-bundle
split 'pkg/job-queue' job-queue
split 'pkg/test' test
diff --git a/composer.json b/composer.json
index 92011f09b..8f16ac430 100644
--- a/composer.json
+++ b/composer.json
@@ -10,6 +10,7 @@
"enqueue/amqp-ext": "*@dev",
"enqueue/redis": "*@dev",
"enqueue/fs": "*@dev",
+ "enqueue/dbal": "*@dev",
"enqueue/enqueue-bundle": "*@dev",
"enqueue/job-queue": "*@dev",
"enqueue/test": "*@dev",
@@ -62,6 +63,10 @@
{
"type": "path",
"url": "pkg/fs"
+ },
+ {
+ "type": "path",
+ "url": "pkg/dbal"
}
]
}
diff --git a/docs/dbal_transport.md b/docs/dbal_transport.md
new file mode 100644
index 000000000..976e877db
--- /dev/null
+++ b/docs/dbal_transport.md
@@ -0,0 +1,110 @@
+# Doctrine DBAL transport
+
+The transport uses [Doctrine DBAL](http://docs.doctrine-project.org/projects/doctrine-dbal/en/latest/) library and SQL like server as a broker.
+It creates a table there. Pushes and pops messages to\from that table.
+
+**Limitations** It works only in auto ack mode hence If consumer crashes the message is lost.
+
+* [Installation](#installation)
+* [Init database](#init-database)
+* [Create context](#create-context)
+* [Send message to topic](#send-message-to-topic)
+* [Send message to queue](#send-message-to-queue)
+* [Consume message](#consume-message)
+
+## Installation
+
+```bash
+$ composer require enqueue/dbal
+```
+
+## Create context
+
+* With config (a connection is created internally):
+
+```php
+ [
+ 'dbname' => 'mqdev',
+ 'user' => 'user',
+ 'password' => 'pass',
+ 'host' => 'localhost',
+ 'port' => 3306,
+ 'driver' => 'pdo_mysql',
+ ],
+ 'table_name' => 'enqueue',
+]);
+
+$psrContext = $factory->createContext();
+```
+
+* With existing connection:
+
+```php
+ 'default',
+]);
+
+$psrContext = $factory->createContext();
+```
+
+## Init database
+
+At first time you have to create a table where your message will live. There is a handy methods for this `createDataBaseTable` on the context.
+Please pay attention to that the database has to be created manually.
+
+```php
+createDataBaseTable();
+```
+
+## Send message to topic
+
+```php
+createTopic('aTopic');
+$message = $psrContext->createMessage('Hello world!');
+
+$psrContext->createProducer()->send($fooTopic, $message);
+```
+
+## Send message to queue
+
+```php
+createQueue('aQueue');
+$message = $psrContext->createMessage('Hello world!');
+
+$psrContext->createProducer()->send($fooQueue, $message);
+```
+
+## Consume message:
+
+```php
+createQueue('aQueue');
+$consumer = $psrContext->createConsumer($fooQueue);
+
+$message = $consumer->receive();
+
+// process a message
+```
+
+[back to index](index.md)
\ No newline at end of file
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index 37c8aa90d..541bb8151 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -37,6 +37,10 @@
pkg/redis/Tests
+
+ pkg/dbal/Tests
+
+
pkg/enqueue-bundle/Tests
diff --git a/pkg/dbal/.gitignore b/pkg/dbal/.gitignore
new file mode 100644
index 000000000..a770439e5
--- /dev/null
+++ b/pkg/dbal/.gitignore
@@ -0,0 +1,6 @@
+*~
+/composer.lock
+/composer.phar
+/phpunit.xml
+/vendor/
+/.idea/
diff --git a/pkg/dbal/.travis.yml b/pkg/dbal/.travis.yml
new file mode 100644
index 000000000..42374ddc7
--- /dev/null
+++ b/pkg/dbal/.travis.yml
@@ -0,0 +1,21 @@
+sudo: false
+
+git:
+ depth: 1
+
+language: php
+
+php:
+ - '5.6'
+ - '7.0'
+
+cache:
+ directories:
+ - $HOME/.composer/cache
+
+install:
+ - composer self-update
+ - composer install --prefer-source
+
+script:
+ - vendor/bin/phpunit --exclude-group=functional
diff --git a/pkg/dbal/Client/DbalDriver.php b/pkg/dbal/Client/DbalDriver.php
new file mode 100644
index 000000000..d3d7dc6d2
--- /dev/null
+++ b/pkg/dbal/Client/DbalDriver.php
@@ -0,0 +1,148 @@
+context = $context;
+ $this->config = $config;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return DbalMessage
+ */
+ public function createTransportMessage(Message $message)
+ {
+ $properties = $message->getProperties();
+
+ $headers = $message->getHeaders();
+ $headers['content_type'] = $message->getContentType();
+
+ $transportMessage = $this->context->createMessage();
+ $transportMessage->setBody($message->getBody());
+ $transportMessage->setHeaders($headers);
+ $transportMessage->setProperties($properties);
+ $transportMessage->setMessageId($message->getMessageId());
+ $transportMessage->setTimestamp($message->getTimestamp());
+ $transportMessage->setDelay($message->getDelay());
+ $transportMessage->setReplyTo($message->getReplyTo());
+ $transportMessage->setCorrelationId($message->getCorrelationId());
+
+ return $transportMessage;
+ }
+
+ /**
+ * @param DbalMessage $message
+ *
+ * {@inheritdoc}
+ */
+ public function createClientMessage(PsrMessage $message)
+ {
+ $clientMessage = new Message();
+
+ $clientMessage->setBody($message->getBody());
+ $clientMessage->setHeaders($message->getHeaders());
+ $clientMessage->setProperties($message->getProperties());
+
+ $clientMessage->setContentType($message->getHeader('content_type'));
+ $clientMessage->setMessageId($message->getMessageId());
+ $clientMessage->setTimestamp($message->getTimestamp());
+ $clientMessage->setPriority(MessagePriority::NORMAL);
+ $clientMessage->setDelay($message->getDelay());
+ $clientMessage->setReplyTo($message->getReplyTo());
+ $clientMessage->setCorrelationId($message->getCorrelationId());
+
+ return $clientMessage;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function sendToRouter(Message $message)
+ {
+ if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
+ throw new \LogicException('Topic name parameter is required but is not set');
+ }
+
+ $queue = $this->createQueue($this->config->getRouterQueueName());
+ $transportMessage = $this->createTransportMessage($message);
+
+ $this->context->createProducer()->send($queue, $transportMessage);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function sendToProcessor(Message $message)
+ {
+ if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
+ throw new \LogicException('Processor name parameter is required but is not set');
+ }
+
+ if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
+ throw new \LogicException('Queue name parameter is required but is not set');
+ }
+
+ $transportMessage = $this->createTransportMessage($message);
+ $destination = $this->createQueue($queueName);
+
+ $this->context->createProducer()->send($destination, $transportMessage);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createQueue($queueName)
+ {
+ return $this->context->createQueue($this->config->createTransportQueueName($queueName));
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setupBroker(LoggerInterface $logger = null)
+ {
+ $logger = $logger ?: new NullLogger();
+ $log = function ($text, ...$args) use ($logger) {
+ $logger->debug(sprintf('[DbalDriver] '.$text, ...$args));
+ };
+
+ $log('Creating database table: "%s"', $this->context->getTableName());
+ $this->context->createDataBaseTable();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getConfig()
+ {
+ return $this->config;
+ }
+}
diff --git a/pkg/dbal/DbalConnectionFactory.php b/pkg/dbal/DbalConnectionFactory.php
new file mode 100644
index 000000000..80e77e29c
--- /dev/null
+++ b/pkg/dbal/DbalConnectionFactory.php
@@ -0,0 +1,76 @@
+ [] - dbal connection options. see http://docs.doctrine-project.org/projects/doctrine-dbal/en/latest/reference/configuration.html
+ * 'table_name' => 'enqueue', - database table name.
+ * 'polling_interval' => '1000', - How often query for new messages (milliseconds)
+ * 'lazy' => true, - Use lazy database connection (boolean)
+ * ]
+ *
+ * @param $config
+ */
+ public function __construct(array $config = [])
+ {
+ $this->config = array_replace([
+ 'connection' => [],
+ 'lazy' => true,
+ ], $config);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return DbalContext
+ */
+ public function createContext()
+ {
+ if ($this->config['lazy']) {
+ return new DbalContext(function () {
+ return $this->establishConnection();
+ }, $this->config);
+ }
+
+ return new DbalContext($this->establishConnection(), $this->config);
+ }
+
+ /**
+ * @return Connection
+ */
+ private function establishConnection()
+ {
+ if (false == $this->connection) {
+ $this->connection = DriverManager::getConnection($this->config['connection']);
+ $this->connection->connect();
+ }
+
+ return $this->connection;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function close()
+ {
+ if ($this->connection) {
+ $this->connection->close();
+ }
+ }
+}
diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php
new file mode 100644
index 000000000..24757504d
--- /dev/null
+++ b/pkg/dbal/DbalConsumer.php
@@ -0,0 +1,238 @@
+context = $context;
+ $this->queue = $queue;
+ $this->dbal = $this->context->getDbalConnection();
+ }
+
+ /**
+ * Set polling interval in milliseconds
+ *
+ * @param int $msec
+ */
+ public function setPollingInterval($msec)
+ {
+ $this->pollingInterval = $msec * 1000;
+ }
+
+ /**
+ * Get polling interval in milliseconds
+ *
+ * @return int
+ */
+ public function getPollingInterval()
+ {
+ return (int) $this->pollingInterval / 1000;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return DbalDestination
+ */
+ public function getQueue()
+ {
+ return $this->queue;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return DbalMessage|null
+ */
+ public function receive($timeout = 0)
+ {
+ $timeout /= 1000;
+ $startAt = microtime(true);
+
+ while (true) {
+ $message = $this->receiveMessage();
+
+ if ($message) {
+ return $message;
+ }
+
+ if ($timeout && (microtime(true) - $startAt) >= $timeout) {
+ return;
+ }
+
+ usleep($this->pollingInterval);
+
+ if ($timeout && (microtime(true) - $startAt) >= $timeout) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return DbalMessage|null
+ */
+ public function receiveNoWait()
+ {
+ return $this->receiveMessage();
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param DbalMessage $message
+ */
+ public function acknowledge(PsrMessage $message)
+ {
+ // does nothing
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param DbalMessage $message
+ */
+ public function reject(PsrMessage $message, $requeue = false)
+ {
+ InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class);
+
+ if (false == $requeue) {
+ return;
+ }
+
+ $dbalMessage = [
+ 'body' => $message->getBody(),
+ 'headers' => JSON::encode($message->getHeaders()),
+ 'properties' => JSON::encode($message->getProperties()),
+ 'priority' => $message->getPriority(),
+ 'queue' => $this->queue->getQueueName(),
+ 'redelivered' => true,
+ ];
+
+ $affectedRows = $this->dbal->insert($this->context->getTableName(), $dbalMessage, [
+ 'body' => Type::TEXT,
+ 'headers' => Type::TEXT,
+ 'properties' => Type::TEXT,
+ 'priority' => Type::SMALLINT,
+ 'queue' => Type::STRING,
+ 'redelivered' => Type::BOOLEAN,
+ ]);
+
+ if (1 !== $affectedRows) {
+ throw new \LogicException(sprintf(
+ 'Expected record was inserted but it is not. message: "%s"',
+ JSON::encode($dbalMessage)
+ ));
+ }
+ }
+
+ /**
+ * @return DbalMessage|null
+ */
+ protected function receiveMessage()
+ {
+ $this->dbal->beginTransaction();
+ try {
+ $now = time();
+
+ $sql = sprintf(
+ 'SELECT * FROM %s WHERE queue=:queue AND ' .
+ '(delayed_until IS NULL OR delayed_until<=:delayedUntil) ' .
+ 'ORDER BY priority DESC, id ASC LIMIT 1 FOR UPDATE',
+ $this->context->getTableName()
+ );
+
+ $dbalMessage = $this->dbal->executeQuery(
+ $sql,
+ [
+ 'queue' => $this->queue->getQueueName(),
+ 'delayedUntil' => $now,
+ ],
+ [
+ 'queue' => Type::STRING,
+ 'delayedUntil' => Type::INTEGER,
+ ]
+ )->fetch();
+
+ if (false == $dbalMessage) {
+ $this->dbal->commit();
+
+ return;
+ }
+
+ // remove message
+ $affectedRows = $this->dbal->delete($this->context->getTableName(), ['id' => $dbalMessage['id']], [
+ 'id' => Type::INTEGER,
+ ]);
+
+ if (1 !== $affectedRows) {
+ throw new \LogicException(sprintf('Expected record was removed but it is not. id: "%s"', $dbalMessage['id']));
+ }
+
+ $this->dbal->commit();
+
+ return $this->convertMessage($dbalMessage);
+
+ } catch (\Exception $e) {
+ $this->dbal->rollBack();
+ throw $e;
+ }
+ }
+
+ /**
+ * @param array $dbalMessage
+ *
+ * @return DbalMessage
+ */
+ protected function convertMessage(array $dbalMessage)
+ {
+ $message = $this->context->createMessage();
+
+ $message->setBody($dbalMessage['body']);
+ $message->setPriority((int) $dbalMessage['priority']);
+ $message->setRedelivered((bool) $dbalMessage['redelivered']);
+
+ if ($dbalMessage['headers']) {
+ $message->setHeaders(JSON::decode($dbalMessage['headers']));
+ }
+
+ if ($dbalMessage['properties']) {
+ $message->setProperties(JSON::decode($dbalMessage['properties']));
+ }
+
+ return $message;
+ }
+}
diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php
new file mode 100644
index 000000000..a14d156bb
--- /dev/null
+++ b/pkg/dbal/DbalContext.php
@@ -0,0 +1,185 @@
+config = array_replace([
+ 'table_name' => 'enqueue',
+ 'polling_interval' => null,
+ ], $config);
+
+ if ($connection instanceof Connection) {
+ $this->connection = $connection;
+ } elseif (is_callable($connection)) {
+ $this->connectionFactory = $connection;
+ } else {
+ throw new \InvalidArgumentException('The connection argument must be either Doctrine\DBAL\Connection or callable that returns Doctrine\DBAL\Connection.');
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return DbalMessage
+ */
+ public function createMessage($body = null, array $properties = [], array $headers = [])
+ {
+ $message = new DbalMessage();
+ $message->setBody($body);
+ $message->setProperties($properties);
+ $message->setHeaders($headers);
+
+ return $message;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return DbalDestination
+ */
+ public function createQueue($name)
+ {
+ return new DbalDestination($name);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return DbalDestination
+ */
+ public function createTopic($name)
+ {
+ return new DbalDestination($name);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createTemporaryQueue()
+ {
+ throw new \BadMethodCallException('Dbal transport does not support temporary queues');
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return DbalProducer
+ */
+ public function createProducer()
+ {
+ return new DbalProducer($this);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return DbalConsumer
+ */
+ public function createConsumer(PsrDestination $destination)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($destination, DbalDestination::class);
+
+ $consumer = new DbalConsumer($this, $destination);
+
+ if (isset($this->config['pollingInterval'])) {
+ $consumer->setPollingInterval($this->config['pollingInterval']);
+ }
+
+ return $consumer;
+ }
+
+ public function close()
+ {
+ }
+
+ /**
+ * @return string
+ */
+ public function getTableName()
+ {
+ return $this->config['table_name'];
+ }
+
+ /**
+ * @return array
+ */
+ public function getConfig()
+ {
+ return $this->config;
+ }
+
+ /**
+ * @return Connection
+ */
+ public function getDbalConnection()
+ {
+ if (false == $this->connection) {
+ $connection = call_user_func($this->connectionFactory);
+ if (false == $connection instanceof Connection) {
+ throw new \LogicException(sprintf(
+ 'The factory must return instance of Doctrine\DBAL\Connection. It returns %s',
+ is_object($connection) ? get_class($connection) : gettype($connection)
+ ));
+ }
+
+ $this->connection = $connection;
+ }
+
+ return $this->connection;
+ }
+
+ public function createDataBaseTable()
+ {
+ $sm = $this->getDbalConnection()->getSchemaManager();
+
+ if ($sm->tablesExist([$this->getTableName()])) {
+ return;
+ }
+
+ $table = new Table($this->getTableName());
+ $table->addColumn('id', 'integer', ['unsigned' => true, 'autoincrement' => true,]);
+ $table->addColumn('body', 'text', ['notnull' => false,]);
+ $table->addColumn('headers', 'text', ['notnull' => false,]);
+ $table->addColumn('properties', 'text', ['notnull' => false,]);
+ $table->addColumn('redelivered', 'boolean', ['notnull' => false,]);
+ $table->addColumn('queue', 'string');
+ $table->addColumn('priority', 'smallint');
+ $table->addColumn('delayed_until', 'integer', ['notnull' => false,]);
+
+ $table->setPrimaryKey(['id']);
+ $table->addIndex(['queue']);
+ $table->addIndex(['priority']);
+ $table->addIndex(['delayed_until']);
+
+ $sm->createTable($table);
+ }
+}
diff --git a/pkg/dbal/DbalDestination.php b/pkg/dbal/DbalDestination.php
new file mode 100644
index 000000000..1138871af
--- /dev/null
+++ b/pkg/dbal/DbalDestination.php
@@ -0,0 +1,37 @@
+destinationName = $name;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getQueueName()
+ {
+ return $this->destinationName;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getTopicName()
+ {
+ return $this->destinationName;
+ }
+}
diff --git a/pkg/dbal/DbalMessage.php b/pkg/dbal/DbalMessage.php
new file mode 100644
index 000000000..620fa5e19
--- /dev/null
+++ b/pkg/dbal/DbalMessage.php
@@ -0,0 +1,246 @@
+body = $body;
+ $this->properties = $properties;
+ $this->headers = $headers;
+ $this->redelivered = false;
+ $this->priority = 0;
+ $this->delay = null;
+ }
+
+ /**
+ * @param string $body
+ */
+ public function setBody($body)
+ {
+ $this->body = $body;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getBody()
+ {
+ return $this->body;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setProperties(array $properties)
+ {
+ $this->properties = $properties;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setProperty($name, $value)
+ {
+ $this->properties[$name] = $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getProperties()
+ {
+ return $this->properties;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getProperty($name, $default = null)
+ {
+ return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setHeader($name, $value)
+ {
+ $this->headers[$name] = $value;
+ }
+
+ /**
+ * @param array $headers
+ */
+ public function setHeaders(array $headers)
+ {
+ $this->headers = $headers;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getHeaders()
+ {
+ return $this->headers;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getHeader($name, $default = null)
+ {
+ return array_key_exists($name, $this->headers) ?$this->headers[$name] : $default;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function isRedelivered()
+ {
+ return $this->redelivered;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setRedelivered($redelivered)
+ {
+ $this->redelivered = $redelivered;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setReplyTo($replyTo)
+ {
+ $this->setHeader('reply_to', $replyTo);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getReplyTo()
+ {
+ return $this->getHeader('reply_to');
+ }
+
+ /**
+ * @return int
+ */
+ public function getPriority()
+ {
+ return $this->priority;
+ }
+
+ /**
+ * @param int $priority
+ */
+ public function setPriority($priority)
+ {
+ $this->priority = $priority;
+ }
+
+ /**
+ * @return int
+ */
+ public function getDelay()
+ {
+ return $this->delay;
+ }
+
+ /**
+ * Set delay in seconds
+ *
+ * @param int $delay
+ */
+ public function setDelay($delay)
+ {
+ $this->delay = $delay;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setCorrelationId($correlationId)
+ {
+ $this->setHeader('correlation_id', $correlationId);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getCorrelationId()
+ {
+ return $this->getHeader('correlation_id', '');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setMessageId($messageId)
+ {
+ $this->setHeader('message_id', $messageId);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getMessageId()
+ {
+ return $this->getHeader('message_id', '');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getTimestamp()
+ {
+ return $this->getHeader('timestamp');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setTimestamp($timestamp)
+ {
+ $this->setHeader('timestamp', (int) $timestamp);
+ }
+}
diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php
new file mode 100644
index 000000000..3e57b0395
--- /dev/null
+++ b/pkg/dbal/DbalProducer.php
@@ -0,0 +1,87 @@
+context = $context;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param PsrDestination $destination
+ * @param PsrMessage $message
+ *
+ * @throws Exception
+ */
+ public function send(PsrDestination $destination, PsrMessage $message)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($destination, DbalDestination::class);
+
+ $body = $message->getBody();
+ if (is_scalar($body) || is_null($body)) {
+ $body = (string)$body;
+ } else {
+ throw new InvalidMessageException(sprintf(
+ 'The message body must be a scalar or null. Got: %s',
+ is_object($body) ? get_class($body) : gettype($body)
+ ));
+ }
+
+ $dbalMessage = [
+ 'body' => $body,
+ 'headers' => JSON::encode($message->getHeaders()),
+ 'properties' => JSON::encode($message->getProperties()),
+ 'priority' => $message->getPriority(),
+ 'queue' => $destination->getQueueName(),
+ ];
+
+ $delay = $message->getDelay();
+ if ($delay) {
+ if (! is_int($delay)) {
+ throw new \LogicException(sprintf(
+ 'Delay must be integer but got: "%s"',
+ is_object($delay) ? get_class($delay) : gettype($delay)
+ ));
+ }
+
+ if ($delay <= 0) {
+ throw new \LogicException(sprintf('Delay must be positive integer but got: "%s"', $delay));
+ }
+
+ $dbalMessage['delayed_until'] = time() + $delay;
+ }
+
+ try {
+ $this->context->getDbalConnection()->insert($this->context->getTableName(), $dbalMessage, [
+ 'body' => Type::TEXT,
+ 'headers' => Type::TEXT,
+ 'properties' => Type::TEXT,
+ 'priority' => Type::SMALLINT,
+ 'queue' => Type::STRING,
+ 'delayed_until' => Type::INTEGER,
+ ]);
+ } catch (\Exception $e) {
+ throw new Exception('The transport fails to send the message due to some internal error.', null, $e);
+ }
+ }
+}
diff --git a/pkg/dbal/LICENSE b/pkg/dbal/LICENSE
new file mode 100644
index 000000000..4c99b4a95
--- /dev/null
+++ b/pkg/dbal/LICENSE
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+Copyright (c) 2013 Oro, Inc
+Copyright (c) 2016 Kotliar Maksym
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/pkg/dbal/ManagerRegistryConnectionFactory.php b/pkg/dbal/ManagerRegistryConnectionFactory.php
new file mode 100644
index 000000000..94281db9a
--- /dev/null
+++ b/pkg/dbal/ManagerRegistryConnectionFactory.php
@@ -0,0 +1,74 @@
+ null, - doctrine dbal connection name
+ * 'table_name' => 'enqueue', - database table name.
+ * 'polling_interval' => 1000, - How often query for new messages (milliseconds)
+ * 'lazy' => true, - Use lazy database connection (boolean)
+ * ]
+ *
+ * @param ManagerRegistry $registry
+ * @param array $config
+ */
+ public function __construct(ManagerRegistry $registry, array $config = [])
+ {
+ $this->config = array_replace([
+ 'connection_name' => null,
+ 'lazy' => true,
+ ], $config);
+
+ $this->registry = $registry;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return DbalContext
+ */
+ public function createContext()
+ {
+ if ($this->config['lazy']) {
+ return new DbalContext(function () {
+ return $this->establishConnection();
+ }, $this->config);
+ }
+
+ return new DbalContext($this->establishConnection(), $this->config);
+ }
+
+ /**
+ * @return Connection
+ */
+ private function establishConnection()
+ {
+ $connection = $this->registry->getConnection($this->config['connection_name']);
+ $connection->connect();
+
+ return $connection;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function close()
+ {
+ }
+}
diff --git a/pkg/dbal/Symfony/DbalTransportFactory.php b/pkg/dbal/Symfony/DbalTransportFactory.php
new file mode 100644
index 000000000..972743521
--- /dev/null
+++ b/pkg/dbal/Symfony/DbalTransportFactory.php
@@ -0,0 +1,121 @@
+name = $name;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function addConfiguration(ArrayNodeDefinition $builder)
+ {
+ $builder
+ ->children()
+ ->variableNode('connection')
+ ->treatNullLike([])
+ ->info('Doctrine DBAL connection options. See http://docs.doctrine-project.org/projects/doctrine-dbal/en/latest/reference/configuration.html')
+ ->end()
+ ->scalarNode('dbal_connection_name')
+ ->defaultNull()
+ ->info('Doctrine dbal connection name.')
+ ->end()
+ ->scalarNode('table_name')
+ ->defaultValue('enqueue')
+ ->cannotBeEmpty()
+ ->info('Database table name.')
+ ->end()
+ ->integerNode('polling_interval')
+ ->defaultValue(1000)
+ ->min(100)
+ ->info('How often query for new messages.')
+ ->end()
+ ->booleanNode('lazy')
+ ->defaultTrue()
+ ->end()
+ ;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createConnectionFactory(ContainerBuilder $container, array $config)
+ {
+ if (false == empty($config['dbal_connection_name'])) {
+ $factory = new Definition(ManagerRegistryConnectionFactory::class);
+ $factory->setArguments([new Reference('doctrine'), $config]);
+ } elseif (false == empty($config['connection'])) {
+ $factory = new Definition(DbalConnectionFactory::class);
+ $factory->setArguments([$config]);
+ } else {
+ throw new \LogicException('Set "dbal_connection_name" options when you want ot use doctrine registry, or use "connection" options to setup direct dbal connection.');
+ }
+
+ $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
+ $container->setDefinition($factoryId, $factory);
+
+ return $factoryId;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createContext(ContainerBuilder $container, array $config)
+ {
+ $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
+
+ $context = new Definition(DbalContext::class);
+ $context->setFactory([new Reference($factoryId), 'createContext']);
+
+ $contextId = sprintf('enqueue.transport.%s.context', $this->getName());
+ $container->setDefinition($contextId, $context);
+
+ return $contextId;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createDriver(ContainerBuilder $container, array $config)
+ {
+ $driver = new Definition(DbalDriver::class);
+ $driver->setArguments([
+ new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
+ new Reference('enqueue.client.config'),
+ ]);
+
+ $driverId = sprintf('enqueue.client.%s.driver', $this->getName());
+ $container->setDefinition($driverId, $driver);
+
+ return $driverId;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getName()
+ {
+ return $this->name;
+ }
+}
diff --git a/pkg/dbal/Tests/Client/DbalDriverTest.php b/pkg/dbal/Tests/Client/DbalDriverTest.php
new file mode 100644
index 000000000..b6a2b25e5
--- /dev/null
+++ b/pkg/dbal/Tests/Client/DbalDriverTest.php
@@ -0,0 +1,320 @@
+assertClassImplements(DriverInterface::class, DbalDriver::class);
+ }
+
+ public function testCouldBeConstructedWithRequiredArguments()
+ {
+ new DbalDriver(
+ $this->createPsrContextMock(),
+ Config::create()
+ );
+ }
+
+ public function testShouldReturnConfigObject()
+ {
+ $config = Config::create();
+
+ $driver = new DbalDriver($this->createPsrContextMock(), $config);
+
+ $this->assertSame($config, $driver->getConfig());
+ }
+
+ public function testShouldCreateAndReturnQueueInstance()
+ {
+ $expectedQueue = new DbalDestination('queue-name');
+
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createQueue')
+ ->with('name')
+ ->will($this->returnValue($expectedQueue))
+ ;
+
+ $driver = new DbalDriver($context, Config::create());
+
+ $queue = $driver->createQueue('name');
+
+ $this->assertSame($expectedQueue, $queue);
+ $this->assertSame('queue-name', $queue->getQueueName());
+ }
+
+ public function testShouldConvertTransportMessageToClientMessage()
+ {
+ $transportMessage = new DbalMessage();
+ $transportMessage->setBody('body');
+ $transportMessage->setHeaders(['hkey' => 'hval']);
+ $transportMessage->setProperties(['key' => 'val']);
+ $transportMessage->setHeader('content_type', 'ContentType');
+ $transportMessage->setMessageId('MessageId');
+ $transportMessage->setTimestamp(1000);
+ $transportMessage->setDelay(12345);
+
+ $driver = new DbalDriver(
+ $this->createPsrContextMock(),
+ Config::create()
+ );
+
+ $clientMessage = $driver->createClientMessage($transportMessage);
+
+ $this->assertInstanceOf(Message::class, $clientMessage);
+ $this->assertSame('body', $clientMessage->getBody());
+ $this->assertSame([
+ 'hkey' => 'hval',
+ 'content_type' => 'ContentType',
+ 'message_id' => 'MessageId',
+ 'timestamp' => 1000,
+ ], $clientMessage->getHeaders());
+ $this->assertSame([
+ 'key' => 'val',
+ ], $clientMessage->getProperties());
+ $this->assertSame('MessageId', $clientMessage->getMessageId());
+ $this->assertSame('ContentType', $clientMessage->getContentType());
+ $this->assertSame(1000, $clientMessage->getTimestamp());
+ $this->assertSame(12345, $clientMessage->getDelay());
+
+ $this->assertNull($clientMessage->getExpire());
+ $this->assertSame(MessagePriority::NORMAL, $clientMessage->getPriority());
+ }
+
+ public function testShouldConvertClientMessageToTransportMessage()
+ {
+ $clientMessage = new Message();
+ $clientMessage->setBody('body');
+ $clientMessage->setHeaders(['hkey' => 'hval']);
+ $clientMessage->setProperties(['key' => 'val']);
+ $clientMessage->setContentType('ContentType');
+ $clientMessage->setExpire(123);
+ $clientMessage->setPriority(MessagePriority::VERY_HIGH);
+ $clientMessage->setMessageId('MessageId');
+ $clientMessage->setTimestamp(1000);
+
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createMessage')
+ ->willReturn(new DbalMessage())
+ ;
+
+ $driver = new DbalDriver(
+ $context,
+ Config::create()
+ );
+
+ $transportMessage = $driver->createTransportMessage($clientMessage);
+
+ $this->assertInstanceOf(DbalMessage::class, $transportMessage);
+ $this->assertSame('body', $transportMessage->getBody());
+ $this->assertSame([
+ 'hkey' => 'hval',
+ 'content_type' => 'ContentType',
+ 'message_id' => 'MessageId',
+ 'timestamp' => 1000,
+ 'reply_to' => null,
+ 'correlation_id' => null
+ ], $transportMessage->getHeaders());
+ $this->assertSame([
+ 'key' => 'val',
+ ], $transportMessage->getProperties());
+ $this->assertSame('MessageId', $transportMessage->getMessageId());
+ $this->assertSame(1000, $transportMessage->getTimestamp());
+ }
+
+ public function testShouldSendMessageToRouter()
+ {
+ $topic = new DbalDestination('queue-name');
+ $transportMessage = new DbalMessage();
+ $config = $this->createConfigMock();
+
+ $config
+ ->expects($this->once())
+ ->method('getRouterQueueName')
+ ->willReturn('topicName');
+
+ $config
+ ->expects($this->once())
+ ->method('createTransportQueueName')
+ ->with('topicName')
+ ->willReturn('app.topicName');
+
+ $producer = $this->createPsrProducerMock();
+ $producer
+ ->expects($this->once())
+ ->method('send')
+ ->with($this->identicalTo($topic), $this->identicalTo($transportMessage))
+ ;
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createQueue')
+ ->with('app.topicName')
+ ->willReturn($topic)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createProducer')
+ ->willReturn($producer)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createMessage')
+ ->willReturn($transportMessage)
+ ;
+
+ $driver = new DbalDriver(
+ $context,
+ $config
+ );
+
+ $message = new Message();
+ $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic');
+
+ $driver->sendToRouter($message);
+ }
+
+ public function testShouldThrowExceptionIfTopicParameterIsNotSet()
+ {
+ $driver = new DbalDriver(
+ $this->createPsrContextMock(),
+ Config::create()
+ );
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Topic name parameter is required but is not set');
+
+ $driver->sendToRouter(new Message());
+ }
+
+ public function testShouldSendMessageToProcessor()
+ {
+ $queue = new DbalDestination('queue-name');
+ $transportMessage = new DbalMessage();
+
+ $producer = $this->createPsrProducerMock();
+ $producer
+ ->expects($this->once())
+ ->method('send')
+ ->with($this->identicalTo($queue), $this->identicalTo($transportMessage))
+ ;
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('createQueue')
+ ->willReturn($queue)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createProducer')
+ ->willReturn($producer)
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createMessage')
+ ->willReturn($transportMessage)
+ ;
+
+ $driver = new DbalDriver(
+ $context,
+ Config::create()
+ );
+
+ $message = new Message();
+ $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor');
+ $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'queue');
+
+ $driver->sendToProcessor($message);
+ }
+
+ public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet()
+ {
+ $driver = new DbalDriver(
+ $this->createPsrContextMock(),
+ Config::create()
+ );
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Processor name parameter is required but is not set');
+
+ $driver->sendToProcessor(new Message());
+ }
+
+ public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet()
+ {
+ $driver = new DbalDriver(
+ $this->createPsrContextMock(),
+ Config::create()
+ );
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Queue name parameter is required but is not set');
+
+ $message = new Message();
+ $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor');
+
+ $driver->sendToProcessor($message);
+ }
+
+ public function testShouldSetupBroker()
+ {
+ $context = $this->createPsrContextMock();
+ $context
+ ->expects($this->once())
+ ->method('getTableName')
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createDataBaseTable')
+ ;
+
+ $driver = new DbalDriver(
+ $context,
+ Config::create()
+ );
+
+ $driver->setupBroker();
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|DbalContext
+ */
+ private function createPsrContextMock()
+ {
+ return $this->createMock(DbalContext::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|PsrProducer
+ */
+ private function createPsrProducerMock()
+ {
+ return $this->createMock(PsrProducer::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Config
+ */
+ private function createConfigMock()
+ {
+ return $this->createMock(Config::class);
+ }
+}
diff --git a/pkg/dbal/Tests/DbalConnectionFactoryTest.php b/pkg/dbal/Tests/DbalConnectionFactoryTest.php
new file mode 100644
index 000000000..e10babd9f
--- /dev/null
+++ b/pkg/dbal/Tests/DbalConnectionFactoryTest.php
@@ -0,0 +1,57 @@
+assertClassImplements(PsrConnectionFactory::class, DbalConnectionFactory::class);
+ }
+
+ public function testCouldBeConstructedWithEmptyConfiguration()
+ {
+ $factory = new DbalConnectionFactory();
+
+ $this->assertAttributeEquals([
+ 'lazy' => true,
+ 'connection' => [],
+ ], 'config', $factory);
+ }
+
+ public function testCouldBeConstructedWithCustomConfiguration()
+ {
+ $factory = new DbalConnectionFactory([
+ 'connection' => [
+ 'dbname' => 'theDbName',
+ ],
+ 'lazy' => false,
+ ]);
+
+ $this->assertAttributeEquals([
+ 'lazy' => false,
+ 'connection' => [
+ 'dbname' => 'theDbName',
+ ],
+ ], 'config', $factory);
+ }
+
+ public function testShouldCreateLazyContext()
+ {
+ $factory = new DbalConnectionFactory(['lazy' => true]);
+
+ $context = $factory->createContext();
+
+ $this->assertInstanceOf(DbalContext::class, $context);
+
+ $this->assertAttributeEquals(null, 'connection', $context);
+ $this->assertAttributeInternalType('callable', 'connectionFactory', $context);
+ }
+}
diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php
new file mode 100644
index 000000000..d9aafb3db
--- /dev/null
+++ b/pkg/dbal/Tests/DbalConsumerTest.php
@@ -0,0 +1,344 @@
+assertClassImplements(PsrConsumer::class, DbalConsumer::class);
+ }
+
+ public function testCouldBeConstructedWithRequiredArguments()
+ {
+ new DbalConsumer($this->createContextMock(), new DbalDestination('queue'));
+ }
+
+ public function testShouldReturnInstanceOfDestination()
+ {
+ $destination = new DbalDestination('queue');
+
+ $consumer = new DbalConsumer($this->createContextMock(), $destination);
+
+ $this->assertSame($destination, $consumer->getQueue());
+ }
+
+ public function testCouldCallAcknowledgedMethod()
+ {
+ $consumer = new DbalConsumer($this->createContextMock(), new DbalDestination('queue'));
+ $consumer->acknowledge(new DbalMessage());
+ }
+
+ public function testCouldSetAndGetPollingInterval()
+ {
+ $destination = new DbalDestination('queue');
+
+ $consumer = new DbalConsumer($this->createContextMock(), $destination);
+ $consumer->setPollingInterval(123456);
+
+ $this->assertEquals(123456, $consumer->getPollingInterval());
+ }
+
+ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid()
+ {
+ $this->expectException(InvalidMessageException::class);
+ $this->expectExceptionMessage(
+ 'The message must be an instance of '.
+ 'Enqueue\Dbal\DbalMessage '.
+ 'but it is Enqueue\Dbal\Tests\InvalidMessage.'
+ );
+
+ $consumer = new DbalConsumer($this->createContextMock(), new DbalDestination('queue'));
+ $consumer->reject(new InvalidMessage());
+ }
+
+ public function testRejectShouldInsertNewMessageOnRequeue()
+ {
+ $expectedMessage = [
+ 'body' => 'theBody',
+ 'headers' => '[]',
+ 'properties' => '[]',
+ 'priority' => 0,
+ 'queue' => 'queue',
+ 'redelivered' => true,
+ ];
+
+ $dbal = $this->createConnectionMock();
+ $dbal
+ ->expects($this->once())
+ ->method('insert')
+ ->with('tableName', $this->equalTo($expectedMessage))
+ ->will($this->returnValue(1))
+ ;
+
+ $context = $this->createContextMock();
+ $context
+ ->expects($this->once())
+ ->method('getDbalConnection')
+ ->will($this->returnValue($dbal))
+ ;
+ $context
+ ->expects($this->once())
+ ->method('getTableName')
+ ->will($this->returnValue('tableName'))
+ ;
+
+ $message = new DbalMessage();
+ $message->setBody('theBody');
+
+ $consumer = new DbalConsumer($context, new DbalDestination('queue'));
+ $consumer->reject($message, true);
+ }
+
+ public function testRejectShouldThrowIfMessageWasNotInserted()
+ {
+ $dbal = $this->createConnectionMock();
+ $dbal
+ ->expects($this->once())
+ ->method('insert')
+ ->willReturn(0)
+ ;
+
+ $context = $this->createContextMock();
+ $context
+ ->expects($this->once())
+ ->method('getDbalConnection')
+ ->will($this->returnValue($dbal))
+ ;
+
+ $message = new DbalMessage();
+ $message->setBody('theBody');
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Expected record was inserted but it is not. message:');
+
+ $consumer = new DbalConsumer($context, new DbalDestination('queue'));
+ $consumer->reject($message, true);
+ }
+
+ public function testShouldReceiveMessage()
+ {
+ $dbalMessage = [
+ 'id' => 'id',
+ 'body' => 'body',
+ 'headers' => '{"hkey":"hvalue"}',
+ 'properties' => '{"pkey":"pvalue"}',
+ 'priority' => 5,
+ 'queue' => 'queue',
+ 'redelivered' => true,
+ ];
+
+ $statement = $this->createStatementMock();
+ $statement
+ ->expects($this->once())
+ ->method('fetch')
+ ->will($this->returnValue($dbalMessage))
+ ;
+
+ $dbal = $this->createConnectionMock();
+ $dbal
+ ->expects($this->once())
+ ->method('executeQuery')
+ ->willReturn($statement)
+ ;
+ $dbal
+ ->expects($this->once())
+ ->method('delete')
+ ->willReturn(1)
+ ;
+ $dbal
+ ->expects($this->once())
+ ->method('commit')
+ ;
+
+ $context = $this->createContextMock();
+ $context
+ ->expects($this->once())
+ ->method('getDbalConnection')
+ ->will($this->returnValue($dbal))
+ ;
+ $context
+ ->expects($this->atLeastOnce())
+ ->method('getTableName')
+ ->will($this->returnValue('tableName'))
+ ;
+ $context
+ ->expects($this->once())
+ ->method('createMessage')
+ ->willReturn(new DbalMessage())
+ ;
+
+ $consumer = new DbalConsumer($context, new DbalDestination('queue'));
+ $result = $consumer->receiveNoWait();
+
+ $this->assertInstanceOf(DbalMessage::class, $result);
+ $this->assertEquals('body', $result->getBody());
+ $this->assertEquals(['hkey' => 'hvalue'], $result->getHeaders());
+ $this->assertEquals(['pkey' => 'pvalue'], $result->getProperties());
+ $this->assertTrue($result->isRedelivered());
+ $this->assertEquals(5, $result->getPriority());
+ }
+
+ public function testShouldReturnNullIfThereIsNoNewMessage()
+ {
+ $statement = $this->createStatementMock();
+ $statement
+ ->expects($this->once())
+ ->method('fetch')
+ ->will($this->returnValue(null))
+ ;
+
+ $dbal = $this->createConnectionMock();
+ $dbal
+ ->expects($this->once())
+ ->method('executeQuery')
+ ->willReturn($statement)
+ ;
+ $dbal
+ ->expects($this->never())
+ ->method('delete')
+ ->willReturn(1)
+ ;
+ $dbal
+ ->expects($this->once())
+ ->method('commit')
+ ;
+
+ $context = $this->createContextMock();
+ $context
+ ->expects($this->once())
+ ->method('getDbalConnection')
+ ->will($this->returnValue($dbal))
+ ;
+ $context
+ ->expects($this->atLeastOnce())
+ ->method('getTableName')
+ ->will($this->returnValue('tableName'))
+ ;
+ $context
+ ->expects($this->never())
+ ->method('createMessage')
+ ->willReturn(new DbalMessage())
+ ;
+
+ $consumer = new DbalConsumer($context, new DbalDestination('queue'));
+ $consumer->setPollingInterval(1000);
+ $result = $consumer->receive(.000001);
+
+ $this->assertEmpty($result);
+ }
+
+ public function testShouldThrowIfMessageWasNotRemoved()
+ {
+ $statement = $this->createStatementMock();
+ $statement
+ ->expects($this->once())
+ ->method('fetch')
+ ->will($this->returnValue(['id' => '2134']))
+ ;
+
+ $dbal = $this->createConnectionMock();
+ $dbal
+ ->expects($this->once())
+ ->method('executeQuery')
+ ->willReturn($statement)
+ ;
+ $dbal
+ ->expects($this->once())
+ ->method('delete')
+ ->willReturn(0)
+ ;
+ $dbal
+ ->expects($this->never())
+ ->method('commit')
+ ;
+ $dbal
+ ->expects($this->once())
+ ->method('rollBack')
+ ;
+
+ $context = $this->createContextMock();
+ $context
+ ->expects($this->once())
+ ->method('getDbalConnection')
+ ->will($this->returnValue($dbal))
+ ;
+ $context
+ ->expects($this->atLeastOnce())
+ ->method('getTableName')
+ ->will($this->returnValue('tableName'))
+ ;
+ $context
+ ->expects($this->never())
+ ->method('createMessage')
+ ->willReturn(new DbalMessage())
+ ;
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Expected record was removed but it is not. id: "2134"');
+
+ $consumer = new DbalConsumer($context, new DbalDestination('queue'));
+ $consumer->setPollingInterval(1000);
+ $consumer->receive(.000001);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Connection
+ */
+ private function createConnectionMock()
+ {
+ return $this->createMock(Connection::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Statement
+ */
+ private function createStatementMock()
+ {
+ return $this->createMock(Statement::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|DbalContext
+ */
+ private function createContextMock()
+ {
+ return $this->createMock(DbalContext::class);
+ }
+}
+
+class InvalidMessage implements PsrMessage
+{
+ public function getBody(){}
+ public function setBody($body){}
+ public function setProperties(array $properties){}
+ public function getProperties(){}
+ public function setProperty($name, $value){}
+ public function getProperty($name, $default = null){}
+ public function setHeaders(array $headers){}
+ public function getHeaders(){}
+ public function setHeader($name, $value){}
+ public function getHeader($name, $default = null){}
+ public function setRedelivered($redelivered){}
+ public function isRedelivered(){}
+ public function setCorrelationId($correlationId){}
+ public function getCorrelationId(){}
+ public function setMessageId($messageId){}
+ public function getMessageId(){}
+ public function getTimestamp(){}
+ public function setTimestamp($timestamp){}
+ public function setReplyTo($replyTo){}
+ public function getReplyTo(){}
+}
diff --git a/pkg/dbal/Tests/DbalContextTest.php b/pkg/dbal/Tests/DbalContextTest.php
new file mode 100644
index 000000000..34fd92c6d
--- /dev/null
+++ b/pkg/dbal/Tests/DbalContextTest.php
@@ -0,0 +1,158 @@
+assertClassImplements(PsrContext::class, DbalContext::class);
+ }
+
+ public function testCouldBeConstructedWithRequiredArguments()
+ {
+ new DbalContext($this->createConnectionMock());
+ }
+
+ public function testCouldBeConstructedWithEmptyConfiguration()
+ {
+ $factory = new DbalContext($this->createConnectionMock(), []);
+
+ $this->assertAttributeEquals([
+ 'table_name' => 'enqueue',
+ 'polling_interval' => null,
+ ], 'config', $factory);
+ }
+
+ public function testCouldBeConstructedWithCustomConfiguration()
+ {
+ $factory = new DbalContext($this->createConnectionMock(), [
+ 'table_name' => 'theTableName',
+ 'polling_interval' => 12345,
+ ]);
+
+ $this->assertAttributeEquals([
+ 'table_name' => 'theTableName',
+ 'polling_interval' => 12345,
+ ], 'config', $factory);
+ }
+
+ public function testShouldCreateMessage()
+ {
+ $context = new DbalContext($this->createConnectionMock());
+ $message = $context->createMessage('body', ['pkey' => 'pval'], ['hkey' => 'hval']);
+
+ $this->assertInstanceOf(DbalMessage::class, $message);
+ $this->assertEquals('body', $message->getBody());
+ $this->assertEquals(['pkey' => 'pval'], $message->getProperties());
+ $this->assertEquals(['hkey' => 'hval'], $message->getHeaders());
+ $this->assertSame(0, $message->getPriority());
+ $this->assertFalse($message->isRedelivered());
+ }
+
+ public function testShouldCreateTopic()
+ {
+ $context = new DbalContext($this->createConnectionMock());
+ $topic = $context->createTopic('topic');
+
+ $this->assertInstanceOf(DbalDestination::class, $topic);
+ $this->assertEquals('topic', $topic->getTopicName());
+ }
+
+ public function testShouldCreateQueue()
+ {
+ $context = new DbalContext($this->createConnectionMock());
+ $queue = $context->createQueue('queue');
+
+ $this->assertInstanceOf(DbalDestination::class, $queue);
+ $this->assertEquals('queue', $queue->getQueueName());
+ }
+
+ public function testShouldCreateProducer()
+ {
+ $context = new DbalContext($this->createConnectionMock());
+
+ $this->assertInstanceOf(DbalProducer::class, $context->createProducer());
+ }
+
+ public function testShouldCreateConsumer()
+ {
+ $context = new DbalContext($this->createConnectionMock());
+
+ $this->assertInstanceOf(DbalConsumer::class, $context->createConsumer(new DbalDestination('')));
+ }
+
+ public function testShouldCreateMessageConsumerAndSetPollingInterval()
+ {
+ $context = new DbalContext($this->createConnectionMock(), [
+ 'pollingInterval' => 123456,
+ ]);
+
+ $consumer = $context->createConsumer(new DbalDestination(''));
+
+ $this->assertInstanceOf(DbalConsumer::class, $consumer);
+ $this->assertEquals(123456, $consumer->getPollingInterval());
+ }
+
+ public function testShouldThrowIfDestinationIsInvalidInstanceType()
+ {
+ $this->expectException(InvalidDestinationException::class);
+ $this->expectExceptionMessage(
+ 'The destination must be an instance of '.
+ 'Enqueue\Dbal\DbalDestination but got '.
+ 'Enqueue\Dbal\Tests\NotSupportedDestination2.'
+ );
+
+ $context = new DbalContext($this->createConnectionMock());
+
+ $this->assertInstanceOf(DbalConsumer::class, $context->createConsumer(new NotSupportedDestination2()));
+ }
+
+ public function testShouldReturnInstanceOfConnection()
+ {
+ $context = new DbalContext($connection = $this->createConnectionMock());
+
+ $this->assertSame($connection, $context->getDbalConnection());
+ }
+
+ public function testShouldReturnConfig()
+ {
+ $context = new DbalContext($connection = $this->createConnectionMock());
+
+ $this->assertSame($connection, $context->getDbalConnection());
+ }
+
+ public function testShouldThrowBadMethodCallExceptiOnOncreateTemporaryQueueCall()
+ {
+ $context = new DbalContext($connection = $this->createConnectionMock());
+
+ $this->expectException(\BadMethodCallException::class);
+ $this->expectExceptionMessage('Dbal transport does not support temporary queues');
+
+ $context->createTemporaryQueue();
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Connection
+ */
+ private function createConnectionMock()
+ {
+ return $this->createMock(Connection::class);
+ }
+}
+
+class NotSupportedDestination2 implements PsrDestination
+{
+}
diff --git a/pkg/dbal/Tests/DbalDestinationTest.php b/pkg/dbal/Tests/DbalDestinationTest.php
new file mode 100644
index 000000000..ff8bc8ace
--- /dev/null
+++ b/pkg/dbal/Tests/DbalDestinationTest.php
@@ -0,0 +1,36 @@
+assertClassImplements(PsrDestination::class, DbalDestination::class);
+ }
+
+ public function testShouldImplementTopicInterface()
+ {
+ $this->assertClassImplements(PsrTopic::class, DbalDestination::class);
+ }
+
+ public function testShouldImplementQueueInterface()
+ {
+ $this->assertClassImplements(PsrQueue::class, DbalDestination::class);
+ }
+
+ public function testShouldReturnTopicAndQueuePreviouslySetInConstructor()
+ {
+ $destination = new DbalDestination('topic-or-queue-name');
+
+ $this->assertSame('topic-or-queue-name', $destination->getQueueName());
+ $this->assertSame('topic-or-queue-name', $destination->getTopicName());
+ }
+}
diff --git a/pkg/dbal/Tests/DbalMessageTest.php b/pkg/dbal/Tests/DbalMessageTest.php
new file mode 100644
index 000000000..a2f50df5d
--- /dev/null
+++ b/pkg/dbal/Tests/DbalMessageTest.php
@@ -0,0 +1,169 @@
+assertClassImplements(PsrMessage::class, DbalMessage::class);
+ }
+
+ public function testCouldBeConstructedWithoutArguments()
+ {
+ $message = new DbalMessage();
+
+ $this->assertNull($message->getBody());
+ $this->assertSame([], $message->getProperties());
+ $this->assertSame([], $message->getHeaders());
+ }
+
+ public function testCouldBeConstructedWithOptionalArguments()
+ {
+ $message = new DbalMessage('theBody', ['barProp' => 'barPropVal'], ['fooHeader' => 'fooHeaderVal']);
+
+ $this->assertSame('theBody', $message->getBody());
+ $this->assertSame(['barProp' => 'barPropVal'], $message->getProperties());
+ $this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders());
+ }
+
+ public function testShouldSetRedeliveredToFalseInConstructor()
+ {
+ $message = new DbalMessage();
+
+ $this->assertSame(false, $message->isRedelivered());
+ }
+
+ public function testShouldSetPriorityToZeroInConstructor()
+ {
+ $message = new DbalMessage();
+
+ $this->assertSame(0, $message->getPriority());
+ }
+
+ public function testShouldSetDelayToNullInConstructor()
+ {
+ $message = new DbalMessage();
+
+ $this->assertNull($message->getDelay());
+ }
+
+ public function testShouldReturnPreviouslySetBody()
+ {
+ $message = new DbalMessage();
+
+ $message->setBody('theBody');
+
+ $this->assertSame('theBody', $message->getBody());
+ }
+
+ public function testShouldReturnPreviouslySetProperties()
+ {
+ $message = new DbalMessage();
+
+ $message->setProperties(['foo' => 'fooVal', 'bar' => 'barVal']);
+
+ $this->assertSame(['foo' => 'fooVal', 'bar' => 'barVal'], $message->getProperties());
+ }
+
+ public function testShouldReturnPreviouslySetProperty()
+ {
+ $message = new DbalMessage(null, ['foo' => 'fooVal']);
+
+ $message->setProperty('bar', 'barVal');
+
+ $this->assertSame(['foo' => 'fooVal', 'bar' => 'barVal'], $message->getProperties());
+ }
+
+ public function testShouldReturnSinglePreviouslySetProperty()
+ {
+ $message = new DbalMessage();
+
+ $this->assertSame(null, $message->getProperty('bar'));
+ $this->assertSame('default', $message->getProperty('bar', 'default'));
+
+ $message->setProperty('bar', 'barVal');
+ $this->assertSame('barVal', $message->getProperty('bar'));
+ }
+
+ public function testShouldReturnPreviouslySetHeaders()
+ {
+ $message = new DbalMessage();
+
+ $message->setHeaders(['foo' => 'fooVal', 'bar' => 'barVal']);
+
+ $this->assertSame(['foo' => 'fooVal', 'bar' => 'barVal'], $message->getHeaders());
+ }
+
+ public function testShouldReturnPreviouslySetHeader()
+ {
+ $message = new DbalMessage(null, [], ['foo' => 'fooVal']);
+
+ $message->setHeader('bar', 'barVal');
+
+ $this->assertSame(['foo' => 'fooVal', 'bar' => 'barVal'], $message->getHeaders());
+ }
+
+ public function testShouldReturnSinglePreviouslySetHeader()
+ {
+ $message = new DbalMessage();
+
+ $this->assertSame(null, $message->getHeader('bar'));
+ $this->assertSame('default', $message->getHeader('bar', 'default'));
+
+ $message->setHeader('bar', 'barVal');
+ $this->assertSame('barVal', $message->getHeader('bar'));
+ }
+
+ public function testShouldReturnPreviouslySetRedelivered()
+ {
+ $message = new DbalMessage();
+
+ $message->setRedelivered(true);
+ $this->assertSame(true, $message->isRedelivered());
+
+ $message->setRedelivered(false);
+ $this->assertSame(false, $message->isRedelivered());
+ }
+
+ public function testShouldReturnPreviouslySetCorrelationId()
+ {
+ $message = new DbalMessage();
+ $message->setCorrelationId('theCorrelationId');
+
+ $this->assertSame('theCorrelationId', $message->getCorrelationId());
+ $this->assertSame(['correlation_id' => 'theCorrelationId'], $message->getHeaders());
+ }
+
+ public function testShouldReturnPreviouslySetMessageId()
+ {
+ $message = new DbalMessage();
+ $message->setMessageId('theMessageId');
+
+ $this->assertSame('theMessageId', $message->getMessageId());
+ $this->assertSame(['message_id' => 'theMessageId'], $message->getHeaders());
+ }
+
+ public function testShouldReturnPreviouslySetTimestamp()
+ {
+ $message = new DbalMessage();
+ $message->setTimestamp(12345);
+
+ $this->assertSame(12345, $message->getTimestamp());
+ $this->assertSame(['timestamp' => 12345], $message->getHeaders());
+ }
+
+ public function testShouldReturnPreviouslySetReplyTo()
+ {
+ $message = new DbalMessage();
+ $message->setReplyTo('theReply');
+
+ $this->assertSame('theReply', $message->getReplyTo());
+ $this->assertSame(['reply_to' => 'theReply'], $message->getHeaders());
+ }
+}
diff --git a/pkg/dbal/Tests/DbalProducerTest.php b/pkg/dbal/Tests/DbalProducerTest.php
new file mode 100644
index 000000000..ef039ca04
--- /dev/null
+++ b/pkg/dbal/Tests/DbalProducerTest.php
@@ -0,0 +1,141 @@
+assertClassImplements(PsrProducer::class, DbalProducer::class);
+ }
+
+ public function testCouldBeConstructedWithRequiredArguments()
+ {
+ new DbalProducer($this->createContextMock());
+ }
+
+ public function testShouldThrowIfBodyOfInvalidType()
+ {
+ $this->expectException(InvalidMessageException::class);
+ $this->expectExceptionMessage('The message body must be a scalar or null. Got: stdClass');
+
+ $producer = new DbalProducer($this->createContextMock());
+
+ $message = new DbalMessage(new \stdClass());
+
+ $producer->send(new DbalDestination(''), $message);
+ }
+
+ public function testShouldThrowIfDestinationOfInvalidType()
+ {
+ $this->expectException(InvalidDestinationException::class);
+ $this->expectExceptionMessage(
+ 'The destination must be an instance of '.
+ 'Enqueue\Dbal\DbalDestination but got '.
+ 'Enqueue\Dbal\Tests\NotSupportedDestination1.'
+ );
+
+ $producer = new DbalProducer($this->createContextMock());
+
+ $producer->send(new NotSupportedDestination1(''), new DbalMessage());
+ }
+
+ public function testShouldThrowIfInsertMessageFailed()
+ {
+ $dbal = $this->createConnectionMock();
+ $dbal
+ ->expects($this->once())
+ ->method('insert')
+ ->will($this->throwException(new \Exception('error message')))
+ ;
+
+ $context = $this->createContextMock();
+ $context
+ ->expects($this->once())
+ ->method('getDbalConnection')
+ ->will($this->returnValue($dbal))
+ ;
+
+ $destination = new DbalDestination('queue-name');
+ $message = new DbalMessage();
+
+ $this->expectException(Exception::class);
+ $this->expectExceptionMessage('The transport fails to send the message due to some internal error.');
+
+ $producer = new DbalProducer($context);
+ $producer->send($destination, $message);
+ }
+
+ public function testShouldSendMessage()
+ {
+ $expectedMessage = [
+ 'body' => 'body',
+ 'headers' => '{"hkey":"hvalue"}',
+ 'properties' => '{"pkey":"pvalue"}',
+ 'priority' => 123,
+ 'queue' => 'queue-name',
+ ];
+
+ $dbal = $this->createConnectionMock();
+ $dbal
+ ->expects($this->once())
+ ->method('insert')
+ ->with('tableName', $expectedMessage)
+ ;
+
+ $context = $this->createContextMock();
+ $context
+ ->expects($this->once())
+ ->method('getDbalConnection')
+ ->will($this->returnValue($dbal))
+ ;
+ $context
+ ->expects($this->once())
+ ->method('getTableName')
+ ->will($this->returnValue('tableName'))
+ ;
+
+ $destination = new DbalDestination('queue-name');
+ $message = new DbalMessage();
+ $message->setBody('body');
+ $message->setHeaders(['hkey' => 'hvalue']);
+ $message->setProperties(['pkey' => 'pvalue']);
+ $message->setPriority(123);
+
+ $producer = new DbalProducer($context);
+ $producer->send($destination, $message);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|DbalContext
+ */
+ private function createContextMock()
+ {
+ return $this->createMock(DbalContext::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Connection
+ */
+ private function createConnectionMock()
+ {
+ return $this->createMock(Connection::class);
+ }
+}
+
+class NotSupportedDestination1 implements PsrDestination
+{
+}
diff --git a/pkg/dbal/Tests/ManagerRegistryConnectionFactoryTest.php b/pkg/dbal/Tests/ManagerRegistryConnectionFactoryTest.php
new file mode 100644
index 000000000..1baefc737
--- /dev/null
+++ b/pkg/dbal/Tests/ManagerRegistryConnectionFactoryTest.php
@@ -0,0 +1,89 @@
+assertClassImplements(PsrConnectionFactory::class, ManagerRegistryConnectionFactory::class);
+ }
+
+ public function testCouldBeConstructedWithEmptyConfiguration()
+ {
+ $factory = new ManagerRegistryConnectionFactory($this->createManagerRegistryMock());
+
+ $this->assertAttributeEquals([
+ 'lazy' => true,
+ 'connection_name' => null,
+ ], 'config', $factory);
+ }
+
+ public function testCouldBeConstructedWithCustomConfiguration()
+ {
+ $factory = new ManagerRegistryConnectionFactory($this->createManagerRegistryMock(), [
+ 'connection_name' => 'theConnectionName',
+ 'lazy' => false,
+ ]);
+
+ $this->assertAttributeEquals([
+ 'lazy' => false,
+ 'connection_name' => 'theConnectionName',
+ ], 'config', $factory);
+ }
+
+ public function testShouldCreateContext()
+ {
+ $registry = $this->createManagerRegistryMock();
+ $registry
+ ->expects($this->once())
+ ->method('getConnection')
+ ->willReturn($this->createConnectionMock())
+ ;
+
+ $factory = new ManagerRegistryConnectionFactory($registry, ['lazy' => false,]);
+
+ $context = $factory->createContext();
+
+ $this->assertInstanceOf(DbalContext::class, $context);
+
+ $this->assertAttributeInstanceOf(Connection::class, 'connection', $context);
+ $this->assertAttributeSame(null, 'connectionFactory', $context);
+ }
+
+ public function testShouldCreateLazyContext()
+ {
+ $factory = new ManagerRegistryConnectionFactory($this->createManagerRegistryMock(), ['lazy' => true]);
+
+ $context = $factory->createContext();
+
+ $this->assertInstanceOf(DbalContext::class, $context);
+
+ $this->assertAttributeEquals(null, 'connection', $context);
+ $this->assertAttributeInternalType('callable', 'connectionFactory', $context);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|ManagerRegistry
+ */
+ private function createManagerRegistryMock()
+ {
+ return $this->createMock(ManagerRegistry::class);
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Connection
+ */
+ private function createConnectionMock()
+ {
+ return $this->createMock(Connection::class);
+ }
+}
diff --git a/pkg/dbal/Tests/Symfony/DbalTransportFactoryTest.php b/pkg/dbal/Tests/Symfony/DbalTransportFactoryTest.php
new file mode 100644
index 000000000..365e9db83
--- /dev/null
+++ b/pkg/dbal/Tests/Symfony/DbalTransportFactoryTest.php
@@ -0,0 +1,154 @@
+assertClassImplements(TransportFactoryInterface::class, DbalTransportFactory::class);
+ }
+
+ public function testCouldBeConstructedWithDefaultName()
+ {
+ $transport = new DbalTransportFactory();
+
+ $this->assertEquals('dbal', $transport->getName());
+ }
+
+ public function testCouldBeConstructedWithCustomName()
+ {
+ $transport = new DbalTransportFactory('theCustomName');
+
+ $this->assertEquals('theCustomName', $transport->getName());
+ }
+
+ public function testShouldAllowAddConfiguration()
+ {
+ $transport = new DbalTransportFactory();
+ $tb = new TreeBuilder();
+ $rootNode = $tb->root('foo');
+
+ $transport->addConfiguration($rootNode);
+ $processor = new Processor();
+ $config = $processor->process($tb->buildTree(),[[
+ 'connection' => [
+ 'key' => 'value'
+ ],
+ ]]);
+
+ $this->assertEquals([
+ 'connection' => [
+ 'key' => 'value',
+ ],
+ 'lazy' => true,
+ 'table_name' => 'enqueue',
+ 'polling_interval' => 1000,
+ 'dbal_connection_name' => null,
+ ], $config);
+ }
+
+ public function testShouldCreateDbalConnectionFactory()
+ {
+ $container = new ContainerBuilder();
+
+ $transport = new DbalTransportFactory();
+
+ $serviceId = $transport->createConnectionFactory($container, [
+ 'connection' => [
+ 'dbname' => 'theDbName',
+ ],
+ 'lazy' => true,
+ 'table_name' => 'enqueue',
+ 'polling_interval' => 1000,
+ ]);
+
+ $this->assertTrue($container->hasDefinition($serviceId));
+ $factory = $container->getDefinition($serviceId);
+ $this->assertEquals(DbalConnectionFactory::class, $factory->getClass());
+ $this->assertSame([
+ 'connection' => [
+ 'dbname' => 'theDbName',
+ ],
+ 'lazy' => true,
+ 'table_name' => 'enqueue',
+ 'polling_interval' => 1000,
+ ], $factory->getArgument(0));
+ }
+
+ public function testShouldCreateManagerRegistryConnectionFactory()
+ {
+ $container = new ContainerBuilder();
+
+ $transport = new DbalTransportFactory();
+
+ $serviceId = $transport->createConnectionFactory($container, [
+ 'dbal_connection_name' => 'default',
+ 'lazy' => true,
+ 'table_name' => 'enqueue',
+ 'polling_interval' => 1000,
+ ]);
+
+ $this->assertTrue($container->hasDefinition($serviceId));
+ $factory = $container->getDefinition($serviceId);
+ $this->assertEquals(ManagerRegistryConnectionFactory::class, $factory->getClass());
+ $this->assertInstanceOf(Reference::class, $factory->getArgument(0));
+ $this->assertSame('doctrine', (string) $factory->getArgument(0));
+ $this->assertSame([
+ 'dbal_connection_name' => 'default',
+ 'lazy' => true,
+ 'table_name' => 'enqueue',
+ 'polling_interval' => 1000,
+ ], $factory->getArgument(1));
+ }
+
+ public function testShouldCreateContext()
+ {
+ $container = new ContainerBuilder();
+
+ $transport = new DbalTransportFactory();
+
+ $serviceId = $transport->createContext($container, []);
+
+ $this->assertEquals('enqueue.transport.dbal.context', $serviceId);
+ $this->assertTrue($container->hasDefinition($serviceId));
+
+ $context = $container->getDefinition('enqueue.transport.dbal.context');
+ $this->assertInstanceOf(Reference::class, $context->getFactory()[0]);
+ $this->assertEquals('enqueue.transport.dbal.connection_factory', (string) $context->getFactory()[0]);
+ $this->assertEquals('createContext', $context->getFactory()[1]);
+ }
+
+ public function testShouldCreateDriver()
+ {
+ $container = new ContainerBuilder();
+
+ $transport = new DbalTransportFactory();
+
+ $serviceId = $transport->createDriver($container, []);
+
+ $this->assertEquals('enqueue.client.dbal.driver', $serviceId);
+ $this->assertTrue($container->hasDefinition($serviceId));
+
+ $driver = $container->getDefinition($serviceId);
+ $this->assertSame(DbalDriver::class, $driver->getClass());
+
+ $this->assertInstanceOf(Reference::class, $driver->getArgument(0));
+ $this->assertEquals('enqueue.transport.dbal.context', (string) $driver->getArgument(0));
+
+ $this->assertInstanceOf(Reference::class, $driver->getArgument(1));
+ $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1));
+ }
+}
diff --git a/pkg/dbal/composer.json b/pkg/dbal/composer.json
new file mode 100644
index 000000000..ff4e62d5c
--- /dev/null
+++ b/pkg/dbal/composer.json
@@ -0,0 +1,41 @@
+{
+ "name": "enqueue/dbal",
+ "type": "library",
+ "description": "Message Queue Doctrine DBAL Transport",
+ "keywords": ["messaging", "queue", "doctrine", "dbal"],
+ "license": "MIT",
+ "repositories": [
+ {
+ "type": "vcs",
+ "url": "git@github.com:php-enqueue/test.git"
+ }
+ ],
+ "require": {
+ "php": ">=5.6",
+ "enqueue/psr-queue": "^0.3",
+ "doctrine/dbal": "~2.5",
+ "psr/log": "^1"
+ },
+ "require-dev": {
+ "phpunit/phpunit": "~5.4.0",
+ "enqueue/test": "^0.3",
+ "enqueue/enqueue": "^0.3",
+ "symfony/dependency-injection": "^2.8|^3",
+ "symfony/config": "^2.8|^3"
+ },
+ "autoload": {
+ "psr-4": { "Enqueue\\Dbal\\": "" },
+ "exclude-from-classmap": [
+ "/Tests/"
+ ]
+ },
+ "suggest": {
+ "enqueue/enqueue": "If you'd like to use advanced features like Client abstract layer or Symfony integration features"
+ },
+ "minimum-stability": "dev",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "0.3.x-dev"
+ }
+ }
+}
diff --git a/pkg/dbal/examples/consume.php b/pkg/dbal/examples/consume.php
new file mode 100644
index 000000000..e120329fd
--- /dev/null
+++ b/pkg/dbal/examples/consume.php
@@ -0,0 +1,46 @@
+ [
+ 'dbname' => getenv('SYMFONY__DB__NAME'),
+ 'user' => getenv('SYMFONY__DB__USER'),
+ 'password' => getenv('SYMFONY__DB__PASSWORD'),
+ 'host' => getenv('SYMFONY__DB__HOST'),
+ 'port' => getenv('SYMFONY__DB__PORT'),
+ 'driver' => getenv('SYMFONY__DB__DRIVER'),
+ ],
+);
+
+$factory = new DbalConnectionFactory($config);
+$context = $factory->createContext();
+$context->createDataBaseTable();
+
+$destination = $context->createTopic('destination');
+
+$consumer = $context->createConsumer($destination);
+
+while (true) {
+ if ($m = $consumer->receive(1000)) {
+ $consumer->acknowledge($m);
+ echo 'Received message: '.$m->getBody().PHP_EOL;
+ }
+}
+
+echo 'Done'."\n";
diff --git a/pkg/dbal/examples/produce.php b/pkg/dbal/examples/produce.php
new file mode 100644
index 000000000..a791ee667
--- /dev/null
+++ b/pkg/dbal/examples/produce.php
@@ -0,0 +1,45 @@
+ [
+ 'dbname' => getenv('SYMFONY__DB__NAME'),
+ 'user' => getenv('SYMFONY__DB__USER'),
+ 'password' => getenv('SYMFONY__DB__PASSWORD'),
+ 'host' => getenv('SYMFONY__DB__HOST'),
+ 'port' => getenv('SYMFONY__DB__PORT'),
+ 'driver' => getenv('SYMFONY__DB__DRIVER'),
+ ],
+);
+
+$factory = new DbalConnectionFactory($config);
+$context = $factory->createContext();
+$context->createDataBaseTable();
+
+$destination = $context->createTopic('destination');
+
+$message = $context->createMessage('Hello Bar!');
+
+while (true) {
+ $context->createProducer()->send($destination, $message);
+ echo 'Sent message: ' . $message->getBody() . PHP_EOL;
+ sleep(1);
+}
+
+echo 'Done'."\n";
diff --git a/pkg/dbal/phpunit.xml.dist b/pkg/dbal/phpunit.xml.dist
new file mode 100644
index 000000000..451d24a00
--- /dev/null
+++ b/pkg/dbal/phpunit.xml.dist
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+ ./Tests
+
+
+
+
+
+ .
+
+ ./vendor
+ ./Tests
+
+
+
+
diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php
index fba89c5f7..531383e04 100644
--- a/pkg/enqueue-bundle/EnqueueBundle.php
+++ b/pkg/enqueue-bundle/EnqueueBundle.php
@@ -11,6 +11,8 @@
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
+use Enqueue\Dbal\DbalContext;
+use Enqueue\Dbal\Symfony\DbalTransportFactory;
use Enqueue\Fs\FsContext;
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Redis\RedisContext;
@@ -58,5 +60,9 @@ public function build(ContainerBuilder $container)
if (class_exists(RedisContext::class)) {
$extension->addTransportFactory(new RedisTransportFactory());
}
+
+ if (class_exists(DbalContext::class)) {
+ $extension->addTransportFactory(new DbalTransportFactory());
+ }
}
}
diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
index e626158d4..063cf05a3 100644
--- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
+++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
@@ -72,7 +72,20 @@ public function provideEnqueueConfigs()
'store_dir' => sys_get_temp_dir(),
]
]
- ]]
+ ]],
+ ['dbal' => [
+ 'transport' => [
+ 'default' => 'dbal',
+ 'dbal' => [
+ 'dbname' => getenv('SYMFONY__DB__NAME'),
+ 'user' => getenv('SYMFONY__DB__USER'),
+ 'password' => getenv('SYMFONY__DB__PASSWORD'),
+ 'host' => getenv('SYMFONY__DB__HOST'),
+ 'port' => getenv('SYMFONY__DB__PORT'),
+ 'driver' => getenv('SYMFONY__DB__DRIVER'),
+ ]
+ ]
+ ]],
];
}
diff --git a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php
index 58ed78fcb..6a63fde9d 100644
--- a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php
+++ b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php
@@ -11,6 +11,7 @@
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
use Enqueue\Bundle\EnqueueBundle;
+use Enqueue\Dbal\Symfony\DbalTransportFactory;
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Redis\Symfony\RedisTransportFactory;
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
@@ -176,6 +177,23 @@ public function testShouldRegisterRedisTransportFactory()
$bundle->build($container);
}
+ public function testShouldRegisterDbalTransportFactory()
+ {
+ $extensionMock = $this->createEnqueueExtensionMock();
+
+ $container = new ContainerBuilder();
+ $container->registerExtension($extensionMock);
+
+ $extensionMock
+ ->expects($this->at(8))
+ ->method('addTransportFactory')
+ ->with($this->isInstanceOf(DbalTransportFactory::class))
+ ;
+
+ $bundle = new EnqueueBundle();
+ $bundle->build($container);
+ }
+
/**
* @return \PHPUnit_Framework_MockObject_MockObject|EnqueueExtension
*/