From 76f65831b5de786760f39f0c2eb263582849baa6 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Sat, 3 Nov 2018 15:43:47 +0200 Subject: [PATCH 1/7] [dbal] Use concurrent fetch message approach (no transaction, no pessimistic lock) --- pkg/dbal/DbalConsumerHelperTrait.php | 120 ++++++++++++++++----------- 1 file changed, 71 insertions(+), 49 deletions(-) diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 2dc29db26..b2bb08cb2 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -11,6 +11,10 @@ trait DbalConsumerHelperTrait { + private $redeliverMessagesLastExecutedAt; + + private $removeExpiredMessagesLastExecutedAt; + abstract protected function getContext(): DbalContext; abstract protected function getConnection(): Connection; @@ -20,66 +24,72 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?array $now = time(); $deliveryId = (string) Uuid::uuid1(); - $this->getConnection()->beginTransaction(); - - try { - $query = $this->getConnection()->createQueryBuilder() - ->select('*') - ->from($this->getContext()->getTableName()) - ->andWhere('delivery_id IS NULL') - ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') - ->andWhere('queue IN (:queues)') - ->addOrderBy('priority', 'desc') - ->addOrderBy('published_at', 'asc') - ->setMaxResults(1); - - // select for update - $message = $this->getConnection()->executeQuery( - $query->getSQL().' '.$this->getConnection()->getDatabasePlatform()->getWriteLockSQL(), - ['delayedUntil' => $now, 'queues' => array_values($queues)], - ['delayedUntil' => ParameterType::INTEGER, 'queues' => Connection::PARAM_STR_ARRAY] - )->fetch(); - - if (!$message) { - $this->getConnection()->commit(); + $endAt = microtime(true) + 0.2; // add 200ms + + $select = $this->getConnection()->createQueryBuilder() + ->select('id') + ->from($this->getContext()->getTableName()) + ->andWhere('delivery_id IS NULL') + ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') + ->andWhere('queue IN (:queues)') + ->addOrderBy('priority', 'desc') + ->addOrderBy('published_at', 'asc') + ->setParameter('delayedUntil', $now, ParameterType::INTEGER) + ->setParameter('queues', array_values($queues), Connection::PARAM_STR_ARRAY) + ->setMaxResults(1); + + $update = $this->getConnection()->createQueryBuilder() + ->update($this->getContext()->getTableName()) + ->set('delivery_id', ':deliveryId') + ->set('redeliver_after', ':redeliverAfter') + ->andWhere('id = :messageId') + ->andWhere('delivery_id IS NULL') + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) + ; + while (microtime() < $endAt) { + $result = $select->execute()->fetch(); + if (empty($result)) { return null; } - // mark message as delivered to consumer - $this->getConnection()->createQueryBuilder() - ->andWhere('id = :id') - ->update($this->getContext()->getTableName()) - ->set('delivery_id', ':deliveryId') - ->set('redeliver_after', ':redeliverAfter') - ->setParameter('id', $message['id'], Type::GUID) - ->setParameter('deliveryId', $deliveryId, Type::STRING) - ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) - ->execute() + $update + ->setParameter('messageId', $result['id'], Type::GUID) ; - $this->getConnection()->commit(); - - $deliveredMessage = $this->getConnection()->createQueryBuilder() - ->select('*') - ->from($this->getContext()->getTableName()) - ->andWhere('delivery_id = :deliveryId') - ->setParameter('deliveryId', $deliveryId, Type::STRING) - ->setMaxResults(1) - ->execute() - ->fetch() - ; - - return $deliveredMessage ?: null; - } catch (\Exception $e) { - $this->getConnection()->rollBack(); - - throw $e; + if ($update->execute()) { + $deliveredMessage = $this->getConnection()->createQueryBuilder() + ->select('*') + ->from($this->getContext()->getTableName()) + ->andWhere('delivery_id = :deliveryId') + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setMaxResults(1) + ->execute() + ->fetch() + ; + + if (false == $deliveredMessage) { + throw new \LogicException('There must be a message at all times at this stage but there is no a message.'); + } + + return $deliveredMessage; + } } + + return null; } protected function redeliverMessages(): void { + if (null === $this->redeliverMessagesLastExecutedAt) { + $this->redeliverMessagesLastExecutedAt = microtime(true); + } + + if ((microtime(true) - $this->redeliverMessagesLastExecutedAt) < 1) { + return; + } + $this->getConnection()->createQueryBuilder() ->update($this->getContext()->getTableName()) ->set('delivery_id', ':deliveryId') @@ -91,10 +101,20 @@ protected function redeliverMessages(): void ->setParameter('redelivered', true, Type::BOOLEAN) ->execute() ; + + $this->redeliverMessagesLastExecutedAt = microtime(true); } protected function removeExpiredMessages(): void { + if (null === $this->removeExpiredMessagesLastExecutedAt) { + $this->removeExpiredMessagesLastExecutedAt = microtime(true); + } + + if ((microtime(true) - $this->removeExpiredMessagesLastExecutedAt) < 1) { + return; + } + $this->getConnection()->createQueryBuilder() ->delete($this->getContext()->getTableName()) ->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)') @@ -102,5 +122,7 @@ protected function removeExpiredMessages(): void ->setParameter('redelivered', false, Type::BOOLEAN) ->execute() ; + + $this->removeExpiredMessagesLastExecutedAt = microtime(true); } } From 1b0c325e92657b740239499d4f7233fda754ebd2 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 5 Nov 2018 10:47:57 +0200 Subject: [PATCH 2/7] [dbal] fix remove expired messages. --- pkg/dbal/DbalConsumerHelperTrait.php | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index b2bb08cb2..2d32eb23f 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -84,9 +84,7 @@ protected function redeliverMessages(): void { if (null === $this->redeliverMessagesLastExecutedAt) { $this->redeliverMessagesLastExecutedAt = microtime(true); - } - - if ((microtime(true) - $this->redeliverMessagesLastExecutedAt) < 1) { + } elseif ((microtime(true) - $this->redeliverMessagesLastExecutedAt) < 1) { return; } @@ -109,15 +107,14 @@ protected function removeExpiredMessages(): void { if (null === $this->removeExpiredMessagesLastExecutedAt) { $this->removeExpiredMessagesLastExecutedAt = microtime(true); - } - - if ((microtime(true) - $this->removeExpiredMessagesLastExecutedAt) < 1) { + } elseif ((microtime(true) - $this->removeExpiredMessagesLastExecutedAt) < 1) { return; } $this->getConnection()->createQueryBuilder() ->delete($this->getContext()->getTableName()) ->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)') + ->andWhere('(redelivered = false OR delivery_id IS NULL)') ->setParameter(':now', (int) time(), Type::BIGINT) ->setParameter('redelivered', false, Type::BOOLEAN) ->execute() From 47472569c684ac8182d82b2121fa8fc58e5ef998 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 5 Nov 2018 12:09:36 +0200 Subject: [PATCH 3/7] [dbal] Optimize query so an index is used, not filesort. --- pkg/dbal/DbalConsumerHelperTrait.php | 2 +- pkg/dbal/DbalContext.php | 18 +++++++++++------- pkg/dbal/DbalProducer.php | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 2d32eb23f..0c0f7e1b1 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -32,7 +32,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?array ->andWhere('delivery_id IS NULL') ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') ->andWhere('queue IN (:queues)') - ->addOrderBy('priority', 'desc') + ->addOrderBy('priority', 'asc') ->addOrderBy('published_at', 'asc') ->setParameter('delayedUntil', $now, ParameterType::INTEGER) ->setParameter('queues', array_values($queues), Connection::PARAM_STR_ARRAY) diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 0cac7ac45..99b2add40 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -154,7 +154,7 @@ public function convertMessage(array $dbalMessage): DbalMessage $dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']); } if (isset($dbalMessage['priority'])) { - $dbalMessageObj->setPriority((int) $dbalMessage['priority']); + $dbalMessageObj->setPriority((int) (-1 * $dbalMessage['priority'])); } if (isset($dbalMessage['published_at'])) { $dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']); @@ -233,14 +233,18 @@ public function createDataBaseTable(): void $table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]); $table->setPrimaryKey(['id']); - $table->addIndex(['published_at']); - $table->addIndex(['queue']); - $table->addIndex(['priority']); - $table->addIndex(['delayed_until']); - $table->addIndex(['priority', 'published_at']); - $table->addIndex(['redeliver_after']); $table->addUniqueIndex(['delivery_id']); + // try to select a message index + $table->addIndex(['delivery_id, delayed_until, queue']); + $table->addIndex(['priority', 'published_at']); + + // redeliver failed messages + $table->addIndex(['delivery_id', 'redeliver_after']); + + // remove expired messages + $table->addIndex(['time_to_live', 'delivery_id']); + $sm->createTable($table); } } diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index 38ad33414..64bec53ea 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -85,7 +85,7 @@ public function send(Destination $destination, Message $message): void 'body' => $body, 'headers' => JSON::encode($message->getHeaders()), 'properties' => JSON::encode($message->getProperties()), - 'priority' => $message->getPriority(), + 'priority' => -1 * $message->getPriority(), 'queue' => $destination->getQueueName(), 'redelivered' => false, 'delivery_id' => null, From 287b0b6137dcd87f604f094f885cd6e999650eff Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 5 Nov 2018 17:11:40 +0200 Subject: [PATCH 4/7] fixes --- pkg/dbal/DbalConsumerHelperTrait.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 0c0f7e1b1..ba1d23877 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -48,7 +48,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?array ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) ; - while (microtime() < $endAt) { + while (microtime(true) < $endAt) { $result = $select->execute()->fetch(); if (empty($result)) { return null; @@ -116,7 +116,6 @@ protected function removeExpiredMessages(): void ->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)') ->andWhere('(redelivered = false OR delivery_id IS NULL)') ->setParameter(':now', (int) time(), Type::BIGINT) - ->setParameter('redelivered', false, Type::BOOLEAN) ->execute() ; From 5316a9785c427f224b39fc0663cf9116bfcb9144 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 5 Nov 2018 23:25:09 +0200 Subject: [PATCH 5/7] make delivery_id field binary, review indexes. --- pkg/dbal/DbalConsumer.php | 5 +++-- pkg/dbal/DbalConsumerHelperTrait.php | 32 +++++++++++++++------------ pkg/dbal/DbalContext.php | 16 +++++--------- pkg/dbal/DbalSubscriptionConsumer.php | 2 +- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index ede0cfa40..eae6a2752 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -11,6 +11,7 @@ use Interop\Queue\Impl\ConsumerPollingTrait; use Interop\Queue\Message; use Interop\Queue\Queue; +use Ramsey\Uuid\Uuid; class DbalConsumer implements Consumer { @@ -138,8 +139,8 @@ private function deleteMessage(string $deliveryId): void $this->getConnection()->delete( $this->getContext()->getTableName(), - ['delivery_id' => $deliveryId], - ['delivery_id' => Type::STRING] + ['delivery_id' => Uuid::fromString($deliveryId)->getBytes()], + ['delivery_id' => Type::BINARY] ); } } diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index ba1d23877..de0b854d1 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -22,16 +22,16 @@ abstract protected function getConnection(): Connection; protected function fetchMessage(array $queues, int $redeliveryDelay): ?array { $now = time(); - $deliveryId = (string) Uuid::uuid1(); + $deliveryId = Uuid::uuid1(); $endAt = microtime(true) + 0.2; // add 200ms $select = $this->getConnection()->createQueryBuilder() ->select('id') ->from($this->getContext()->getTableName()) - ->andWhere('delivery_id IS NULL') - ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') ->andWhere('queue IN (:queues)') + ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') + ->andWhere('delivery_id IS NULL') ->addOrderBy('priority', 'asc') ->addOrderBy('published_at', 'asc') ->setParameter('delayedUntil', $now, ParameterType::INTEGER) @@ -44,11 +44,11 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?array ->set('redeliver_after', ':redeliverAfter') ->andWhere('id = :messageId') ->andWhere('delivery_id IS NULL') - ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setParameter('deliveryId', $deliveryId->getBytes(), Type::BINARY) ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) ; - while (microtime(true) < $endAt) { + while (microtime(true) >= $endAt) { $result = $select->execute()->fetch(); if (empty($result)) { return null; @@ -63,7 +63,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?array ->select('*') ->from($this->getContext()->getTableName()) ->andWhere('delivery_id = :deliveryId') - ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setParameter('deliveryId', $deliveryId->getBytes(), Type::BINARY) ->setMaxResults(1) ->execute() ->fetch() @@ -88,18 +88,19 @@ protected function redeliverMessages(): void return; } - $this->getConnection()->createQueryBuilder() + $update = $this->getConnection()->createQueryBuilder() ->update($this->getContext()->getTableName()) ->set('delivery_id', ':deliveryId') ->set('redelivered', ':redelivered') - ->andWhere('delivery_id IS NOT NULL') ->andWhere('redeliver_after < :now') - ->setParameter(':now', (int) time(), Type::BIGINT) - ->setParameter('deliveryId', null, Type::STRING) + ->andWhere('delivery_id IS NOT NULL') + ->setParameter(':now', time(), Type::BIGINT) + ->setParameter('deliveryId', null, Type::BINARY) ->setParameter('redelivered', true, Type::BOOLEAN) - ->execute() ; + $update->execute(); + $this->redeliverMessagesLastExecutedAt = microtime(true); } @@ -111,14 +112,17 @@ protected function removeExpiredMessages(): void return; } - $this->getConnection()->createQueryBuilder() + $update = $this->getConnection()->createQueryBuilder() ->delete($this->getContext()->getTableName()) ->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)') - ->andWhere('(redelivered = false OR delivery_id IS NULL)') + ->andWhere('delivery_id IS NULL') + ->andWhere('redelivered = false') + ->setParameter(':now', (int) time(), Type::BIGINT) - ->execute() ; + $update->execute(); + $this->removeExpiredMessagesLastExecutedAt = microtime(true); } } diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 99b2add40..7d0e00914 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -17,6 +17,7 @@ use Interop\Queue\Queue; use Interop\Queue\SubscriptionConsumer; use Interop\Queue\Topic; +use Ramsey\Uuid\Uuid; class DbalContext implements Context { @@ -160,7 +161,7 @@ public function convertMessage(array $dbalMessage): DbalMessage $dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']); } if (isset($dbalMessage['delivery_id'])) { - $dbalMessageObj->setDeliveryId((string) $dbalMessage['delivery_id']); + $dbalMessageObj->setDeliveryId(Uuid::fromBytes($dbalMessage['delivery_id'])->toString()); } if (isset($dbalMessage['redeliver_after'])) { $dbalMessageObj->setRedeliverAfter((int) $dbalMessage['redeliver_after']); @@ -229,20 +230,13 @@ public function createDataBaseTable(): void $table->addColumn('priority', Type::SMALLINT, ['notnull' => false]); $table->addColumn('delayed_until', Type::BIGINT, ['notnull' => false]); $table->addColumn('time_to_live', Type::BIGINT, ['notnull' => false]); - $table->addColumn('delivery_id', Type::STRING, ['notnull' => false]); + $table->addColumn('delivery_id', Type::BINARY, ['length' => 16, 'fixed' => true, 'notnull' => false]); $table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]); $table->setPrimaryKey(['id']); - $table->addUniqueIndex(['delivery_id']); + $table->addIndex(['priority', 'published_at', 'queue', 'delivery_id', 'delayed_until', 'id']); - // try to select a message index - $table->addIndex(['delivery_id, delayed_until, queue']); - $table->addIndex(['priority', 'published_at']); - - // redeliver failed messages - $table->addIndex(['delivery_id', 'redeliver_after']); - - // remove expired messages + $table->addIndex(['redeliver_after', 'delivery_id']); $table->addIndex(['time_to_live', 'delivery_id']); $sm->createTable($table); diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 12c1cef37..be0bf18c1 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -75,6 +75,7 @@ public function consume(int $timeout = 0): void } $timeout /= 1000; + $now = time(); $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds $currentQueueNames = []; @@ -83,7 +84,6 @@ public function consume(int $timeout = 0): void $currentQueueNames = $queueNames; } - $now = time(); $this->removeExpiredMessages(); $this->redeliverMessages(); From 09e148ce09fa313c90daa10b68254be2badd6311 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 6 Nov 2018 11:16:01 +0200 Subject: [PATCH 6/7] [dbal] fix tests. --- pkg/dbal/DbalConsumer.php | 24 +--------- pkg/dbal/DbalConsumerHelperTrait.php | 39 +++++++++++----- pkg/dbal/DbalContext.php | 45 ++++++++++--------- pkg/dbal/DbalMessage.php | 15 +++++++ pkg/dbal/DbalProducer.php | 1 - pkg/dbal/DbalSubscriptionConsumer.php | 8 ++-- pkg/dbal/Tests/DbalConsumerTest.php | 17 ++++--- .../Tests/Functional/DbalConsumerTest.php | 28 ++++++------ 8 files changed, 98 insertions(+), 79 deletions(-) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index eae6a2752..e95a8f513 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -5,13 +5,11 @@ namespace Enqueue\Dbal; use Doctrine\DBAL\Connection; -use Doctrine\DBAL\Types\Type; use Interop\Queue\Consumer; use Interop\Queue\Exception\InvalidMessageException; use Interop\Queue\Impl\ConsumerPollingTrait; use Interop\Queue\Message; use Interop\Queue\Queue; -use Ramsey\Uuid\Uuid; class DbalConsumer implements Consumer { @@ -82,14 +80,7 @@ public function receiveNoWait(): ?Message $this->removeExpiredMessages(); $this->redeliverMessages(); - // get top message from the queue - if ($message = $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay)) { - if ($message['redelivered'] || empty($message['time_to_live']) || $message['time_to_live'] > time()) { - return $this->getContext()->convertMessage($message); - } - } - - return null; + return $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay); } /** @@ -130,17 +121,4 @@ protected function getConnection(): Connection { return $this->dbal; } - - private function deleteMessage(string $deliveryId): void - { - if (empty($deliveryId)) { - throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId)); - } - - $this->getConnection()->delete( - $this->getContext()->getTableName(), - ['delivery_id' => Uuid::fromString($deliveryId)->getBytes()], - ['delivery_id' => Type::BINARY] - ); - } } diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index de0b854d1..871b437d6 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -19,8 +19,12 @@ abstract protected function getContext(): DbalContext; abstract protected function getConnection(): Connection; - protected function fetchMessage(array $queues, int $redeliveryDelay): ?array + protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessage { + if (empty($queues)) { + throw new \LogicException('Queues must not be empty.'); + } + $now = time(); $deliveryId = Uuid::uuid1(); @@ -34,8 +38,8 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?array ->andWhere('delivery_id IS NULL') ->addOrderBy('priority', 'asc') ->addOrderBy('published_at', 'asc') + ->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY) ->setParameter('delayedUntil', $now, ParameterType::INTEGER) - ->setParameter('queues', array_values($queues), Connection::PARAM_STR_ARRAY) ->setMaxResults(1); $update = $this->getConnection()->createQueryBuilder() @@ -44,11 +48,11 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?array ->set('redeliver_after', ':redeliverAfter') ->andWhere('id = :messageId') ->andWhere('delivery_id IS NULL') - ->setParameter('deliveryId', $deliveryId->getBytes(), Type::BINARY) + ->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID) ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) ; - while (microtime(true) >= $endAt) { + while (microtime(true) < $endAt) { $result = $select->execute()->fetch(); if (empty($result)) { return null; @@ -63,7 +67,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?array ->select('*') ->from($this->getContext()->getTableName()) ->andWhere('delivery_id = :deliveryId') - ->setParameter('deliveryId', $deliveryId->getBytes(), Type::BINARY) + ->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID) ->setMaxResults(1) ->execute() ->fetch() @@ -73,7 +77,9 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?array throw new \LogicException('There must be a message at all times at this stage but there is no a message.'); } - return $deliveredMessage; + if ($deliveredMessage['redelivered'] || empty($deliveredMessage['time_to_live']) || $deliveredMessage['time_to_live'] > time()) { + return $this->getContext()->convertMessage($deliveredMessage); + } } } @@ -95,7 +101,7 @@ protected function redeliverMessages(): void ->andWhere('redeliver_after < :now') ->andWhere('delivery_id IS NOT NULL') ->setParameter(':now', time(), Type::BIGINT) - ->setParameter('deliveryId', null, Type::BINARY) + ->setParameter('deliveryId', null, Type::GUID) ->setParameter('redelivered', true, Type::BOOLEAN) ; @@ -112,17 +118,30 @@ protected function removeExpiredMessages(): void return; } - $update = $this->getConnection()->createQueryBuilder() + $delete = $this->getConnection()->createQueryBuilder() ->delete($this->getContext()->getTableName()) ->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)') ->andWhere('delivery_id IS NULL') ->andWhere('redelivered = false') - ->setParameter(':now', (int) time(), Type::BIGINT) + ->setParameter(':now', time(), Type::BIGINT) ; - $update->execute(); + $delete->execute(); $this->removeExpiredMessagesLastExecutedAt = microtime(true); } + + private function deleteMessage(string $deliveryId): void + { + if (empty($deliveryId)) { + throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId)); + } + + $this->getConnection()->delete( + $this->getContext()->getTableName(), + ['delivery_id' => Uuid::fromString($deliveryId)->getBytes()], + ['delivery_id' => Type::GUID] + ); + } } diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 7d0e00914..63ce0b1da 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -142,32 +142,38 @@ public function createSubscriptionConsumer(): SubscriptionConsumer /** * @internal It must be used here and in the consumer only */ - public function convertMessage(array $dbalMessage): DbalMessage + public function convertMessage(array $arrayMessage): DbalMessage { - /** @var DbalMessage $dbalMessageObj */ - $dbalMessageObj = $this->createMessage( - $dbalMessage['body'], - $dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [], - $dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : [] + /** @var DbalMessage $message */ + $message = $this->createMessage( + $arrayMessage['body'], + $arrayMessage['properties'] ? JSON::decode($arrayMessage['properties']) : [], + $arrayMessage['headers'] ? JSON::decode($arrayMessage['headers']) : [] ); - if (isset($dbalMessage['redelivered'])) { - $dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']); + if (isset($arrayMessage['id'])) { + $message->setMessageId(Uuid::fromBytes($arrayMessage['id'])->toString()); } - if (isset($dbalMessage['priority'])) { - $dbalMessageObj->setPriority((int) (-1 * $dbalMessage['priority'])); + if (isset($arrayMessage['queue'])) { + $message->setQueue($arrayMessage['queue']); } - if (isset($dbalMessage['published_at'])) { - $dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']); + if (isset($arrayMessage['redelivered'])) { + $message->setRedelivered((bool) $arrayMessage['redelivered']); } - if (isset($dbalMessage['delivery_id'])) { - $dbalMessageObj->setDeliveryId(Uuid::fromBytes($dbalMessage['delivery_id'])->toString()); + if (isset($arrayMessage['priority'])) { + $message->setPriority((int) (-1 * $arrayMessage['priority'])); } - if (isset($dbalMessage['redeliver_after'])) { - $dbalMessageObj->setRedeliverAfter((int) $dbalMessage['redeliver_after']); + if (isset($arrayMessage['published_at'])) { + $message->setPublishedAt((int) $arrayMessage['published_at']); + } + if (isset($arrayMessage['delivery_id'])) { + $message->setDeliveryId(Uuid::fromBytes($arrayMessage['delivery_id'])->toString()); + } + if (isset($arrayMessage['redeliver_after'])) { + $message->setRedeliverAfter((int) $arrayMessage['redeliver_after']); } - return $dbalMessageObj; + return $message; } /** @@ -219,8 +225,7 @@ public function createDataBaseTable(): void $table = new Table($this->getTableName()); - $table->addColumn('id', Type::BINARY, ['length' => 16, 'fixed' => true]); - $table->addColumn('human_id', Type::STRING, ['length' => 36]); + $table->addColumn('id', Type::GUID, ['length' => 16, 'fixed' => true]); $table->addColumn('published_at', Type::BIGINT); $table->addColumn('body', Type::TEXT, ['notnull' => false]); $table->addColumn('headers', Type::TEXT, ['notnull' => false]); @@ -230,7 +235,7 @@ public function createDataBaseTable(): void $table->addColumn('priority', Type::SMALLINT, ['notnull' => false]); $table->addColumn('delayed_until', Type::BIGINT, ['notnull' => false]); $table->addColumn('time_to_live', Type::BIGINT, ['notnull' => false]); - $table->addColumn('delivery_id', Type::BINARY, ['length' => 16, 'fixed' => true, 'notnull' => false]); + $table->addColumn('delivery_id', Type::GUID, ['length' => 16, 'fixed' => true, 'notnull' => false]); $table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]); $table->setPrimaryKey(['id']); diff --git a/pkg/dbal/DbalMessage.php b/pkg/dbal/DbalMessage.php index 8464d56e3..af62c1079 100644 --- a/pkg/dbal/DbalMessage.php +++ b/pkg/dbal/DbalMessage.php @@ -53,6 +53,11 @@ class DbalMessage implements Message */ private $deliveryId; + /** + * @var string|null + */ + private $queue; + /** * Milliseconds, for example 15186054527288. * @@ -249,4 +254,14 @@ public function setPublishedAt(int $publishedAt = null): void { $this->publishedAt = $publishedAt; } + + public function getQueue(): ?string + { + return $this->queue; + } + + public function setQueue(?string $queue): void + { + $this->queue = $queue; + } } diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index 64bec53ea..f5e3c842a 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -80,7 +80,6 @@ public function send(Destination $destination, Message $message): void $dbalMessage = [ 'id' => $this->uuidCodec->encodeBinary($uuid), - 'human_id' => $uuid->toString(), 'published_at' => $publishedAt, 'body' => $body, 'headers' => JSON::encode($message->getHeaders()), diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index be0bf18c1..60d30cc7e 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -88,19 +88,17 @@ public function consume(int $timeout = 0): void $this->redeliverMessages(); if ($message = $this->fetchMessage($currentQueueNames, $redeliveryDelay)) { - $dbalMessage = $this->getContext()->convertMessage($message); - /** * @var DbalConsumer * @var callable $callback */ - list($consumer, $callback) = $this->subscribers[$message['queue']]; + list($consumer, $callback) = $this->subscribers[$message->getQueue()]; - if (false === call_user_func($callback, $dbalMessage, $consumer)) { + if (false === call_user_func($callback, $message, $consumer)) { return; } - unset($currentQueueNames[$message['queue']]); + unset($currentQueueNames[$message->getQueue()]); } else { $currentQueueNames = []; diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index 4eeb70a75..8c70c4c5a 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -16,6 +16,7 @@ use Interop\Queue\Exception\InvalidMessageException; use Interop\Queue\Message; use PHPUnit\Framework\TestCase; +use Ramsey\Uuid\Uuid; class DbalConsumerTest extends TestCase { @@ -55,11 +56,13 @@ public function testAcknowledgeShouldThrowIfInstanceOfMessageIsInvalid() public function testShouldDeleteMessageOnAcknowledge() { + $deliveryId = Uuid::uuid1(); + $queue = new DbalDestination('queue'); $message = new DbalMessage(); $message->setBody('theBody'); - $message->setDeliveryId('foo-delivery-id'); + $message->setDeliveryId($deliveryId->toString()); $dbal = $this->createConectionMock(); $dbal @@ -67,8 +70,8 @@ public function testShouldDeleteMessageOnAcknowledge() ->method('delete') ->with( 'some-table-name', - ['delivery_id' => $message->getDeliveryId()], - ['delivery_id' => Type::STRING] + ['delivery_id' => $deliveryId->getBytes()], + ['delivery_id' => Type::GUID] ) ; @@ -124,11 +127,13 @@ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() public function testShouldDeleteMessageFromQueueOnReject() { + $deliveryId = Uuid::uuid1(); + $queue = new DbalDestination('queue'); $message = new DbalMessage(); $message->setBody('theBody'); - $message->setDeliveryId('foo-delivery-id'); + $message->setDeliveryId($deliveryId->toString()); $dbal = $this->createConectionMock(); $dbal @@ -136,8 +141,8 @@ public function testShouldDeleteMessageFromQueueOnReject() ->method('delete') ->with( 'some-table-name', - ['delivery_id' => $message->getDeliveryId()], - ['delivery_id' => Type::STRING] + ['delivery_id' => $deliveryId->getBytes()], + ['delivery_id' => Type::GUID] ) ; diff --git a/pkg/dbal/Tests/Functional/DbalConsumerTest.php b/pkg/dbal/Tests/Functional/DbalConsumerTest.php index ecee1bc71..01e18fe94 100644 --- a/pkg/dbal/Tests/Functional/DbalConsumerTest.php +++ b/pkg/dbal/Tests/Functional/DbalConsumerTest.php @@ -43,7 +43,7 @@ public function testShouldSetPublishedAtDateToReceivedMessage() $consumer = $context->createConsumer($queue); // guard - $this->assertNull($consumer->receiveNoWait()); + $this->assertSame(0, $this->getQuerySize()); $time = (int) (microtime(true) * 10000); @@ -51,11 +51,12 @@ public function testShouldSetPublishedAtDateToReceivedMessage() $producer = $context->createProducer(); + /** @var DbalMessage $message */ $message = $context->createMessage($expectedBody); $message->setPublishedAt($time); $producer->send($queue, $message); - $message = $consumer->receive(8000); // 8 sec + $message = $consumer->receive(100); // 100ms $this->assertInstanceOf(DbalMessage::class, $message); $consumer->acknowledge($message); @@ -71,7 +72,7 @@ public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate() $consumer = $context->createConsumer($queue); // guard - $this->assertNull($consumer->receiveNoWait()); + $this->assertSame(0, $this->getQuerySize()); $time = (int) (microtime(true) * 10000); $olderTime = $time - 10000; @@ -97,7 +98,7 @@ public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate() $consumer->acknowledge($message); $this->assertSame($expectedPriority5BodyOlderTime, $message->getBody()); - $message = $consumer->receive(8000); // 8 sec + $message = $consumer->receive(100); // 8 sec $this->assertInstanceOf(DbalMessage::class, $message); $consumer->acknowledge($message); @@ -109,17 +110,14 @@ public function testShouldDeleteExpiredMessage() $context = $this->context; $queue = $context->createQueue(__METHOD__); - $consumer = $context->createConsumer($queue); - // guard - $this->assertNull($consumer->receiveNoWait()); + $this->assertSame(0, $this->getQuerySize()); $producer = $context->createProducer(); $this->context->getDbalConnection()->insert( $this->context->getTableName(), [ 'id' => 'id', - 'human_id' => 'id', 'published_at' => '123', 'body' => 'expiredMessage', 'headers' => json_encode([]), @@ -133,20 +131,22 @@ public function testShouldDeleteExpiredMessage() $message->setRedelivered(false); $producer->send($queue, $message); - $this->assertSame('2', $this->getQuerySize()); + $this->assertSame(2, $this->getQuerySize()); - $message = $consumer->receive(8000); + // we need a new consumer to workaround redeliver + $consumer = $context->createConsumer($queue); + $message = $consumer->receive(100); - $this->assertSame('1', $this->getQuerySize()); + $this->assertSame(1, $this->getQuerySize()); $consumer->acknowledge($message); - $this->assertSame('0', $this->getQuerySize()); + $this->assertSame(0, $this->getQuerySize()); } - private function getQuerySize(): string + private function getQuerySize(): int { - return $this->context->getDbalConnection() + return (int) $this->context->getDbalConnection() ->executeQuery('SELECT count(*) FROM '.$this->context->getTableName()) ->fetchColumn(0) ; From 2445ef2bed312b6bc01a4efffd1ae6eca4bd4730 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 6 Nov 2018 20:40:35 +0200 Subject: [PATCH 7/7] use uuid v4 to prevent collisions --- pkg/dbal/DbalConsumerHelperTrait.php | 2 +- pkg/dbal/DbalProducer.php | 2 +- pkg/dbal/Tests/DbalConsumerTest.php | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 871b437d6..e9c5252db 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -26,7 +26,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa } $now = time(); - $deliveryId = Uuid::uuid1(); + $deliveryId = Uuid::uuid4(); $endAt = microtime(true) + 0.2; // add 200ms diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index f5e3c842a..014c7775c 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -71,7 +71,7 @@ public function send(Destination $destination, Message $message): void } $body = $message->getBody(); - $uuid = Uuid::uuid1(); + $uuid = Uuid::uuid4(); $publishedAt = null !== $message->getPublishedAt() ? $message->getPublishedAt() : diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index 8c70c4c5a..1478fe89a 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -56,7 +56,7 @@ public function testAcknowledgeShouldThrowIfInstanceOfMessageIsInvalid() public function testShouldDeleteMessageOnAcknowledge() { - $deliveryId = Uuid::uuid1(); + $deliveryId = Uuid::uuid4(); $queue = new DbalDestination('queue'); @@ -127,7 +127,7 @@ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() public function testShouldDeleteMessageFromQueueOnReject() { - $deliveryId = Uuid::uuid1(); + $deliveryId = Uuid::uuid4(); $queue = new DbalDestination('queue');