From 02659a52fc81c58f9d292e29903c678d58099000 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Wed, 17 Oct 2018 17:27:44 +0300 Subject: [PATCH 1/6] Dbal Subscription Consumer feature --- pkg/dbal/DbalConsumer.php | 23 +- pkg/dbal/DbalContext.php | 3 +- pkg/dbal/DbalMessage.php | 21 ++ pkg/dbal/DbalSubscriptionConsumer.php | 196 ++++++++++++++++++ pkg/dbal/Tests/DbalMessageTest.php | 16 ++ .../Tests/DbalSubscriptionConsumerTest.php | 176 ++++++++++++++++ ...umerConsumeFromAllSubscribedQueuesTest.php | 41 ++++ ...onConsumerConsumeUntilUnsubscribedTest.php | 41 ++++ ...balSubscriptionConsumerStopOnFalseTest.php | 41 ++++ 9 files changed, 534 insertions(+), 24 deletions(-) create mode 100644 pkg/dbal/DbalSubscriptionConsumer.php create mode 100644 pkg/dbal/Tests/DbalSubscriptionConsumerTest.php create mode 100644 pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php create mode 100644 pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeUntilUnsubscribedTest.php create mode 100644 pkg/dbal/Tests/Spec/DbalSubscriptionConsumerStopOnFalseTest.php diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 2f0976625..430ba3d4c 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -142,7 +142,7 @@ protected function receiveMessage(): ?DbalMessage $this->dbal->commit(); if (empty($dbalMessage['time_to_live']) || ($dbalMessage['time_to_live'] / 1000) > microtime(true)) { - return $this->convertMessage($dbalMessage); + return DbalMessage::fromArray($dbalMessage); } return null; @@ -153,27 +153,6 @@ protected function receiveMessage(): ?DbalMessage } } - protected function convertMessage(array $dbalMessage): DbalMessage - { - /** @var DbalMessage $message */ - $message = $this->context->createMessage(); - - $message->setBody($dbalMessage['body']); - $message->setPriority((int) $dbalMessage['priority']); - $message->setRedelivered((bool) $dbalMessage['redelivered']); - $message->setPublishedAt((int) $dbalMessage['published_at']); - - if ($dbalMessage['headers']) { - $message->setHeaders(JSON::decode($dbalMessage['headers'])); - } - - if ($dbalMessage['properties']) { - $message->setProperties(JSON::decode($dbalMessage['properties'])); - } - - return $message; - } - private function fetchPrioritizedMessage(int $now): ?array { $query = $this->dbal->createQueryBuilder(); diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 91fa667ff..3c954749e 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -11,7 +11,6 @@ use Interop\Queue\Context; use Interop\Queue\Destination; use Interop\Queue\Exception\InvalidDestinationException; -use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException; use Interop\Queue\Exception\TemporaryQueueNotSupportedException; use Interop\Queue\Message; use Interop\Queue\Producer; @@ -126,7 +125,7 @@ public function close(): void public function createSubscriptionConsumer(): SubscriptionConsumer { - throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt(); + return new DbalSubscriptionConsumer($this); } /** diff --git a/pkg/dbal/DbalMessage.php b/pkg/dbal/DbalMessage.php index 88b49c588..ec988e2f9 100644 --- a/pkg/dbal/DbalMessage.php +++ b/pkg/dbal/DbalMessage.php @@ -67,6 +67,27 @@ public function __construct(string $body = '', array $properties = [], array $he $this->deliveryDelay = null; } + public static function fromArray(array $dbalMessage): self + { + $dbalMessageObj = new self( + $dbalMessage['body'], + $dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [], + $dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : [] + ); + + if (isset($dbalMessage['redelivered'])) { + $dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']); + } + if (isset($dbalMessage['priority'])) { + $dbalMessageObj->setPriority((int) $dbalMessage['priority']); + } + if (isset($dbalMessage['published_at'])) { + $dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']); + } + + return $dbalMessageObj; + } + public function setBody(string $body): void { $this->body = $body; diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php new file mode 100644 index 000000000..d169e8f54 --- /dev/null +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -0,0 +1,196 @@ +context = $context; + $this->dbal = $this->context->getDbalConnection(); + $this->subscribers = []; + } + + public function consume(int $timeout = 0): void + { + if (empty($this->subscribers)) { + throw new \LogicException('No subscribers'); + } + + $timeout = (int) ceil($timeout / 1000); + $endAt = time() + $timeout; + + $queueNames = []; + foreach (array_keys($this->subscribers) as $queueName) { + $queueNames[$queueName] = $queueName; + } + + $currentQueueNames = []; + while (true) { + if (empty($currentQueueNames)) { + $currentQueueNames = $queueNames; + } + + $message = $this->fetchPrioritizedMessage($currentQueueNames) ?: $this->fetchMessage($currentQueueNames); + + if ($message) { + $this->dbal->delete($this->context->getTableName(), ['id' => $message['id']], ['id' => Type::GUID]); + + $dbalMessage = DbalMessage::fromArray($message); + + /** + * @var DbalConsumer + * @var callable $callback + */ + list($consumer, $callback) = $this->subscribers[$message['queue']]; + + if (false === call_user_func($callback, $dbalMessage, $consumer)) { + return; + } + + unset($currentQueueNames[$message['queue']]); + } else { + $currentQueueNames = []; + } + + if ($timeout && microtime(true) >= $endAt) { + return; + } + } + } + + /** + * @param DbalConsumer $consumer + */ + public function subscribe(Consumer $consumer, callable $callback): void + { + if (false == $consumer instanceof DbalConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', DbalConsumer::class, get_class($consumer))); + } + + $queueName = $consumer->getQueue()->getQueueName(); + if (array_key_exists($queueName, $this->subscribers)) { + if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) { + return; + } + + throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName)); + } + + $this->subscribers[$queueName] = [$consumer, $callback]; + } + + /** + * @param DbalConsumer $consumer + */ + public function unsubscribe(Consumer $consumer): void + { + if (false == $consumer instanceof DbalConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', DbalConsumer::class, get_class($consumer))); + } + + $queueName = $consumer->getQueue()->getQueueName(); + + if (false == array_key_exists($queueName, $this->subscribers)) { + return; + } + + if ($this->subscribers[$queueName][0] !== $consumer) { + return; + } + + unset($this->subscribers[$queueName]); + } + + public function unsubscribeAll(): void + { + $this->subscribers = []; + } + + private function fetchMessage(array $queues): ?array + { + $query = $this->dbal->createQueryBuilder(); + $query + ->select('*') + ->from($this->context->getTableName()) + ->andWhere('queue IN (:queues)') + ->andWhere('priority IS NULL') + ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') + ->addOrderBy('published_at', 'asc') + ->setMaxResults(1) + ; + + $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); + + $result = $this->dbal->executeQuery( + $sql, + [ + 'queues' => array_keys($queues), + 'delayedUntil' => time(), + ], + [ + 'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY, + 'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER, + ] + )->fetch(); + + return $result ?: null; + } + + private function fetchPrioritizedMessage(array $queues): ?array + { + $query = $this->dbal->createQueryBuilder(); + $query + ->select('*') + ->from($this->context->getTableName()) + ->andWhere('queue IN (:queues)') + ->andWhere('priority IS NOT NULL') + ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') + ->addOrderBy('published_at', 'asc') + ->addOrderBy('priority', 'desc') + ->setMaxResults(1) + ; + + $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); + + $result = $this->dbal->executeQuery( + $sql, + [ + 'queues' => array_keys($queues), + 'delayedUntil' => time(), + ], + [ + 'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY, + 'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER, + ] + )->fetch(); + + return $result ?: null; + } +} diff --git a/pkg/dbal/Tests/DbalMessageTest.php b/pkg/dbal/Tests/DbalMessageTest.php index 05f4c89fb..d6c0f8a8f 100644 --- a/pkg/dbal/Tests/DbalMessageTest.php +++ b/pkg/dbal/Tests/DbalMessageTest.php @@ -1,5 +1,7 @@ assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders()); } + public function testCouldBeCreatedFromArray() + { + $arrayData = [ + 'body' => 'theBody', + 'properties' => json_encode(['barProp' => 'barPropVal']), + 'headers' => json_encode(['fooHeader' => 'fooHeaderVal']), + ]; + $message = DbalMessage::fromArray($arrayData); + + $this->assertSame('theBody', $message->getBody()); + $this->assertSame(['barProp' => 'barPropVal'], $message->getProperties()); + $this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders()); + } + public function testShouldSetPriorityToNullInConstructor() { $message = new DbalMessage(); diff --git a/pkg/dbal/Tests/DbalSubscriptionConsumerTest.php b/pkg/dbal/Tests/DbalSubscriptionConsumerTest.php new file mode 100644 index 000000000..2d3a29d79 --- /dev/null +++ b/pkg/dbal/Tests/DbalSubscriptionConsumerTest.php @@ -0,0 +1,176 @@ +assertTrue($rc->implementsInterface(SubscriptionConsumer::class)); + } + + public function testCouldBeConstructedWithDbalContextAsFirstArgument() + { + new DbalSubscriptionConsumer($this->createDbalContextMock()); + } + + public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + + $this->assertAttributeSame([ + 'foo_queue' => [$fooConsumer, $fooCallback], + 'bar_queue' => [$barConsumer, $barCallback], + ], 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('There is a consumer subscribed to queue: "foo_queue"'); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + } + + public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + } + + public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $fooConsumer = $this->createConsumerStub('foo_queue'); + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, function () {}); + $subscriptionConsumer->subscribe($barConsumer, function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($fooConsumer); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('bar_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('foo_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + $subscriptionConsumer->subscribe($this->createConsumerStub('bar_queue'), function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribeAll(); + + $this->assertAttributeCount(0, 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTryConsumeWithoutSubscribers() + { + $subscriptionConsumer = new DbalSubscriptionConsumer($this->createDbalContextMock()); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('No subscribers'); + + $subscriptionConsumer->consume(); + } + + /** + * @return DbalContext|\PHPUnit_Framework_MockObject_MockObject + */ + private function createDbalContextMock() + { + return $this->createMock(DbalContext::class); + } + + /** + * @param null|mixed $queueName + * + * @return Consumer|\PHPUnit_Framework_MockObject_MockObject + */ + private function createConsumerStub($queueName = null) + { + $queueMock = $this->createMock(Queue::class); + $queueMock + ->expects($this->any()) + ->method('getQueueName') + ->willReturn($queueName); + + $consumerMock = $this->createMock(DbalConsumer::class); + $consumerMock + ->expects($this->any()) + ->method('getQueue') + ->willReturn($queueMock); + + return $consumerMock; + } +} diff --git a/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php new file mode 100644 index 000000000..4f31958d3 --- /dev/null +++ b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php @@ -0,0 +1,41 @@ +createDbalContext(); + } + + /** + * @param DbalContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeUntilUnsubscribedTest.php b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeUntilUnsubscribedTest.php new file mode 100644 index 000000000..23590144e --- /dev/null +++ b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerConsumeUntilUnsubscribedTest.php @@ -0,0 +1,41 @@ +createDbalContext(); + } + + /** + * @param DbalContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerStopOnFalseTest.php b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerStopOnFalseTest.php new file mode 100644 index 000000000..c303b0c29 --- /dev/null +++ b/pkg/dbal/Tests/Spec/DbalSubscriptionConsumerStopOnFalseTest.php @@ -0,0 +1,41 @@ +createDbalContext(); + } + + /** + * @param DbalContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + $context->purgeQueue($queue); + + return $queue; + } +} From 314e79009106ae265d6603a1236e45cddcd70ffe Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Thu, 18 Oct 2018 03:25:37 +0300 Subject: [PATCH 2/6] Renamed named constructor of DbalMessage --- pkg/dbal/DbalConsumer.php | 2 +- pkg/dbal/DbalMessage.php | 2 +- pkg/dbal/DbalSubscriptionConsumer.php | 2 +- pkg/dbal/Tests/DbalMessageTest.php | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 430ba3d4c..fe299ce5c 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -142,7 +142,7 @@ protected function receiveMessage(): ?DbalMessage $this->dbal->commit(); if (empty($dbalMessage['time_to_live']) || ($dbalMessage['time_to_live'] / 1000) > microtime(true)) { - return DbalMessage::fromArray($dbalMessage); + return DbalMessage::fromArrayDbResult($dbalMessage); } return null; diff --git a/pkg/dbal/DbalMessage.php b/pkg/dbal/DbalMessage.php index ec988e2f9..534d55b8c 100644 --- a/pkg/dbal/DbalMessage.php +++ b/pkg/dbal/DbalMessage.php @@ -67,7 +67,7 @@ public function __construct(string $body = '', array $properties = [], array $he $this->deliveryDelay = null; } - public static function fromArray(array $dbalMessage): self + public static function fromArrayDbResult(array $dbalMessage): self { $dbalMessageObj = new self( $dbalMessage['body'], diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index d169e8f54..b48476ad8 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -62,7 +62,7 @@ public function consume(int $timeout = 0): void if ($message) { $this->dbal->delete($this->context->getTableName(), ['id' => $message['id']], ['id' => Type::GUID]); - $dbalMessage = DbalMessage::fromArray($message); + $dbalMessage = DbalMessage::fromArrayDbResult($message); /** * @var DbalConsumer diff --git a/pkg/dbal/Tests/DbalMessageTest.php b/pkg/dbal/Tests/DbalMessageTest.php index d6c0f8a8f..8ae62587b 100644 --- a/pkg/dbal/Tests/DbalMessageTest.php +++ b/pkg/dbal/Tests/DbalMessageTest.php @@ -36,7 +36,7 @@ public function testCouldBeCreatedFromArray() 'properties' => json_encode(['barProp' => 'barPropVal']), 'headers' => json_encode(['fooHeader' => 'fooHeaderVal']), ]; - $message = DbalMessage::fromArray($arrayData); + $message = DbalMessage::fromArrayDbResult($arrayData); $this->assertSame('theBody', $message->getBody()); $this->assertSame(['barProp' => 'barPropVal'], $message->getProperties()); From b3d26f62c692265201b879a502d3429a476570f7 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Thu, 18 Oct 2018 11:31:22 +0300 Subject: [PATCH 3/6] Replaced DbalMessage::fromArrrayDbResult() with DbalContext::convertMessage() --- pkg/dbal/DbalConsumer.php | 2 +- pkg/dbal/DbalContext.php | 24 ++++++++++++++++++++++++ pkg/dbal/DbalMessage.php | 21 --------------------- pkg/dbal/DbalSubscriptionConsumer.php | 2 +- pkg/dbal/Tests/DbalContextTest.php | 15 +++++++++++++++ pkg/dbal/Tests/DbalMessageTest.php | 14 -------------- 6 files changed, 41 insertions(+), 37 deletions(-) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index fe299ce5c..c8e1b02d4 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -142,7 +142,7 @@ protected function receiveMessage(): ?DbalMessage $this->dbal->commit(); if (empty($dbalMessage['time_to_live']) || ($dbalMessage['time_to_live'] / 1000) > microtime(true)) { - return DbalMessage::fromArrayDbResult($dbalMessage); + return $this->context->convertMessage($dbalMessage); } return null; diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 3c954749e..082b038b2 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -128,6 +128,30 @@ public function createSubscriptionConsumer(): SubscriptionConsumer return new DbalSubscriptionConsumer($this); } + /** + * @internal It must be used here and in the consumer only + */ + public function convertMessage(array $dbalMessage): DbalMessage + { + $dbalMessageObj = new DbalMessage( + $dbalMessage['body'], + $dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [], + $dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : [] + ); + + if (isset($dbalMessage['redelivered'])) { + $dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']); + } + if (isset($dbalMessage['priority'])) { + $dbalMessageObj->setPriority((int) $dbalMessage['priority']); + } + if (isset($dbalMessage['published_at'])) { + $dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']); + } + + return $dbalMessageObj; + } + /** * @param DbalDestination $queue */ diff --git a/pkg/dbal/DbalMessage.php b/pkg/dbal/DbalMessage.php index 534d55b8c..88b49c588 100644 --- a/pkg/dbal/DbalMessage.php +++ b/pkg/dbal/DbalMessage.php @@ -67,27 +67,6 @@ public function __construct(string $body = '', array $properties = [], array $he $this->deliveryDelay = null; } - public static function fromArrayDbResult(array $dbalMessage): self - { - $dbalMessageObj = new self( - $dbalMessage['body'], - $dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [], - $dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : [] - ); - - if (isset($dbalMessage['redelivered'])) { - $dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']); - } - if (isset($dbalMessage['priority'])) { - $dbalMessageObj->setPriority((int) $dbalMessage['priority']); - } - if (isset($dbalMessage['published_at'])) { - $dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']); - } - - return $dbalMessageObj; - } - public function setBody(string $body): void { $this->body = $body; diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index b48476ad8..5a1b1d2eb 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -62,7 +62,7 @@ public function consume(int $timeout = 0): void if ($message) { $this->dbal->delete($this->context->getTableName(), ['id' => $message['id']], ['id' => Type::GUID]); - $dbalMessage = DbalMessage::fromArrayDbResult($message); + $dbalMessage = $this->context->convertMessage($message); /** * @var DbalConsumer diff --git a/pkg/dbal/Tests/DbalContextTest.php b/pkg/dbal/Tests/DbalContextTest.php index 0cddfdf71..b505b0dde 100644 --- a/pkg/dbal/Tests/DbalContextTest.php +++ b/pkg/dbal/Tests/DbalContextTest.php @@ -64,6 +64,21 @@ public function testShouldCreateMessage() $this->assertFalse($message->isRedelivered()); } + public function testShouldConvertArrayToDbalMessage() + { + $arrayData = [ + 'body' => 'theBody', + 'properties' => json_encode(['barProp' => 'barPropVal']), + 'headers' => json_encode(['fooHeader' => 'fooHeaderVal']), + ]; + $context = new DbalContext($this->createConnectionMock()); + $message = $context->convertMessage($arrayData); + + $this->assertSame('theBody', $message->getBody()); + $this->assertSame(['barProp' => 'barPropVal'], $message->getProperties()); + $this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders()); + } + public function testShouldCreateTopic() { $context = new DbalContext($this->createConnectionMock()); diff --git a/pkg/dbal/Tests/DbalMessageTest.php b/pkg/dbal/Tests/DbalMessageTest.php index 8ae62587b..19f234582 100644 --- a/pkg/dbal/Tests/DbalMessageTest.php +++ b/pkg/dbal/Tests/DbalMessageTest.php @@ -29,20 +29,6 @@ public function testCouldBeConstructedWithOptionalArguments() $this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders()); } - public function testCouldBeCreatedFromArray() - { - $arrayData = [ - 'body' => 'theBody', - 'properties' => json_encode(['barProp' => 'barPropVal']), - 'headers' => json_encode(['fooHeader' => 'fooHeaderVal']), - ]; - $message = DbalMessage::fromArrayDbResult($arrayData); - - $this->assertSame('theBody', $message->getBody()); - $this->assertSame(['barProp' => 'barPropVal'], $message->getProperties()); - $this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders()); - } - public function testShouldSetPriorityToNullInConstructor() { $message = new DbalMessage(); From fa41a17c9a9be71b56cef3bd9b6b5df1546f5c11 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Thu, 18 Oct 2018 17:10:55 +0300 Subject: [PATCH 4/6] Replaced constructor with owned method createMessage() in DbalContext. --- pkg/dbal/DbalContext.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 082b038b2..5a715b288 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -133,7 +133,7 @@ public function createSubscriptionConsumer(): SubscriptionConsumer */ public function convertMessage(array $dbalMessage): DbalMessage { - $dbalMessageObj = new DbalMessage( + $dbalMessageObj = $this->createMessage( $dbalMessage['body'], $dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [], $dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : [] From 4eab59cdc64ebf21583d3d58cfda8ef8800ef577 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Thu, 18 Oct 2018 18:31:17 +0300 Subject: [PATCH 5/6] Added Subscription consumer to docs --- docs/transport/amqp.md | 2 +- docs/transport/amqp_bunny.md | 2 +- docs/transport/amqp_lib.md | 2 +- docs/transport/dbal.md | 34 ++++++++++++++++++++++++++++++++++ 4 files changed, 37 insertions(+), 3 deletions(-) diff --git a/docs/transport/amqp.md b/docs/transport/amqp.md index 7c073d407..80845e71a 100644 --- a/docs/transport/amqp.md +++ b/docs/transport/amqp.md @@ -235,7 +235,7 @@ use Interop\Queue\PsrConsumer; $fooConsumer = $psrContext->createConsumer($fooQueue); $barConsumer = $psrContext->createConsumer($barQueue); -$subscriptionConsumer =$psrContext->createSubscriptionConsumer(); +$subscriptionConsumer = $psrContext->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) { // process message diff --git a/docs/transport/amqp_bunny.md b/docs/transport/amqp_bunny.md index 241cfe836..c0f68534b 100644 --- a/docs/transport/amqp_bunny.md +++ b/docs/transport/amqp_bunny.md @@ -227,7 +227,7 @@ use Interop\Queue\PsrConsumer; $fooConsumer = $psrContext->createConsumer($fooQueue); $barConsumer = $psrContext->createConsumer($barQueue); -$subscriptionConsumer =$psrContext->createSubscriptionConsumer(); +$subscriptionConsumer = $psrContext->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) { // process message diff --git a/docs/transport/amqp_lib.md b/docs/transport/amqp_lib.md index 6f749233b..b3e7e055a 100644 --- a/docs/transport/amqp_lib.md +++ b/docs/transport/amqp_lib.md @@ -235,7 +235,7 @@ use Interop\Queue\PsrConsumer; $fooConsumer = $psrContext->createConsumer($fooQueue); $barConsumer = $psrContext->createConsumer($barQueue); -$subscriptionConsumer =$psrContext->createSubscriptionConsumer(); +$subscriptionConsumer = $psrContext->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) { // process message diff --git a/docs/transport/dbal.md b/docs/transport/dbal.md index 9fcaab04a..6e72d7953 100644 --- a/docs/transport/dbal.md +++ b/docs/transport/dbal.md @@ -11,6 +11,7 @@ It creates a table there. Pushes and pops messages to\from that table. * [Send message to topic](#send-message-to-topic) * [Send message to queue](#send-message-to-queue) * [Consume message](#consume-message) +* [Subscription consumer](#subscription-consumer) ## Installation @@ -103,4 +104,37 @@ $message = $consumer->receive(); // process a message ``` +## Subscription consumer + +```php +createConsumer($fooQueue); +$barConsumer = $psrContext->createConsumer($barQueue); + +$subscriptionConsumer = $psrContext->createSubscriptionConsumer(); +$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) { + // process message + + $consumer->acknowledge($message); + + return true; +}); +$subscriptionConsumer->subscribe($barConsumer, function(PsrMessage $message, PsrConsumer $consumer) { + // process message + + $consumer->acknowledge($message); + + return true; +}); + +$subscriptionConsumer->consume(2000); // 2 sec +``` + [back to index](../index.md) \ No newline at end of file From 1c9f5910e261e5a649f2035933ef0dbad2827545 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Thu, 18 Oct 2018 18:40:38 +0300 Subject: [PATCH 6/6] Added usleep 200ms when no messages are recieved --- pkg/dbal/DbalSubscriptionConsumer.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 5a1b1d2eb..a822cee04 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -77,6 +77,8 @@ public function consume(int $timeout = 0): void unset($currentQueueNames[$message['queue']]); } else { $currentQueueNames = []; + + usleep(200000); // 200ms } if ($timeout && microtime(true) >= $endAt) {