From 4ca285f07d0f4cb58f881afa474d35ab2e427b33 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 26 Oct 2018 13:28:18 +0300 Subject: [PATCH 1/7] redis new implementation --- pkg/redis/LuaScripts.php | 38 ++++++ pkg/redis/PRedis.php | 28 +++++ pkg/redis/PhpRedis.php | 27 +++++ pkg/redis/Redis.php | 32 +++++ pkg/redis/RedisConsumer.php | 71 +++++++++--- pkg/redis/RedisContext.php | 13 ++- pkg/redis/RedisMessage.php | 79 +++++++++++++ pkg/redis/RedisProducer.php | 53 ++++++--- pkg/redis/RedisQueueConsumer.php | 148 ++++++++++++++++++++++++ pkg/redis/RedisSubscriptionConsumer.php | 52 ++++++--- 10 files changed, 493 insertions(+), 48 deletions(-) create mode 100644 pkg/redis/LuaScripts.php create mode 100644 pkg/redis/RedisQueueConsumer.php diff --git a/pkg/redis/LuaScripts.php b/pkg/redis/LuaScripts.php new file mode 100644 index 000000000..4a40d2447 --- /dev/null +++ b/pkg/redis/LuaScripts.php @@ -0,0 +1,38 @@ +redis, 'eval'], array_merge([$script, count($keys)], $keys, $args)); + } catch (PRedisServerException $e) { + throw new ServerException('eval command has failed', null, $e); + } + } + + public function zadd(string $key, string $value, float $score): int + { + try { + return $this->redis->zadd($key, [$value => $score]); + } catch (PRedisServerException $e) { + throw new ServerException('zadd command has failed', null, $e); + } + } + + public function zrem(string $key, string $value): int + { + try { + return $this->redis->zrem($key, [$value]); + } catch (PRedisServerException $e) { + throw new ServerException('zrem command has failed', null, $e); + } + } + public function lpush(string $key, string $value): int { try { diff --git a/pkg/redis/PhpRedis.php b/pkg/redis/PhpRedis.php index bf6da281a..bd8dd9498 100644 --- a/pkg/redis/PhpRedis.php +++ b/pkg/redis/PhpRedis.php @@ -26,6 +26,33 @@ public function __construct(array $config) $this->config = $config; } + public function eval(string $script, array $keys = [], array $args = []) + { + try { + return $this->redis->eval($script, array_merge($keys, $args), count($keys)); + } catch (\RedisException $e) { + throw new ServerException('eval command has failed', null, $e); + } + } + + public function zadd(string $key, string $value, float $score): int + { + try { + return $this->redis->zAdd($key, $score, $value); + } catch (\RedisException $e) { + throw new ServerException('zadd command has failed', null, $e); + } + } + + public function zrem(string $key, string $value): int + { + try { + return $this->redis->zRem($key, $value); + } catch (\RedisException $e) { + throw new ServerException('zrem command has failed', null, $e); + } + } + public function lpush(string $key, string $value): int { try { diff --git a/pkg/redis/Redis.php b/pkg/redis/Redis.php index 2ea2e054e..1baa4d909 100644 --- a/pkg/redis/Redis.php +++ b/pkg/redis/Redis.php @@ -6,6 +6,38 @@ interface Redis { + /** + * @param string $script + * @param array $keys + * @param array $args + * + * @throws ServerException + * + * @return mixed + */ + public function eval(string $script, array $keys = [], array $args = []); + + /** + * @param string $key + * @param string $value + * @param float $score + * + * @throws ServerException + * + * @return int + */ + public function zadd(string $key, string $value, float $score): int; + + /** + * @param string $key + * @param string $value + * + * @throws ServerException + * + * @return int + */ + public function zrem(string $key, string $value): int; + /** * @param string $key * @param string $value diff --git a/pkg/redis/RedisConsumer.php b/pkg/redis/RedisConsumer.php index 0fb9db787..4722d428d 100644 --- a/pkg/redis/RedisConsumer.php +++ b/pkg/redis/RedisConsumer.php @@ -21,12 +21,42 @@ class RedisConsumer implements Consumer */ private $context; + /** + * @var int + */ + private $retryDelay; + + /** + * @var RedisQueueConsumer + */ + private $queueConsumer; + public function __construct(RedisContext $context, RedisDestination $queue) { $this->context = $context; $this->queue = $queue; } + /** + * @return int + */ + public function getRetryDelay(): ?int + { + return $this->retryDelay; + } + + /** + * @param int $retryDelay + */ + public function setRetryDelay(int $retryDelay): void + { + $this->retryDelay = $retryDelay; + + if ($this->queueConsumer) { + $this->queueConsumer->setRetryDelay($this->retryDelay); + } + } + /** * @return RedisDestination */ @@ -40,8 +70,9 @@ public function getQueue(): Queue */ public function receive(int $timeout = 0): ?Message { - $timeout = (int) ($timeout / 1000); - if (empty($timeout)) { + $timeout = (int) ceil($timeout / 1000); + + if ($timeout <= 0) { while (true) { if ($message = $this->receive(5000)) { return $message; @@ -49,11 +80,9 @@ public function receive(int $timeout = 0): ?Message } } - if ($result = $this->getRedis()->brpop([$this->queue->getName()], $timeout)) { - return RedisMessage::jsonUnserialize($result->getMessage()); - } + $this->initQueueConsumer(); - return null; + return $this->queueConsumer->receiveMessage($timeout); } /** @@ -61,11 +90,9 @@ public function receive(int $timeout = 0): ?Message */ public function receiveNoWait(): ?Message { - if ($result = $this->getRedis()->rpop($this->queue->getName())) { - return RedisMessage::jsonUnserialize($result->getMessage()); - } + $this->initQueueConsumer(); - return null; + return $this->queueConsumer->receiveMessageNoWait($this->queue); } /** @@ -73,7 +100,7 @@ public function receiveNoWait(): ?Message */ public function acknowledge(Message $message): void { - // do nothing. redis transport always works in auto ack mode + $this->getRedis()->zrem($this->queue->getName().':reserved', $message->getReservedKey()); } /** @@ -83,10 +110,17 @@ public function reject(Message $message, bool $requeue = false): void { InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class); - // do nothing on reject. redis transport always works in auto ack mode + $this->acknowledge($message); if ($requeue) { - $this->context->createProducer()->send($this->queue, $message); + $message = RedisMessage::jsonUnserialize($message->getReservedKey()); + $message->setHeader('attempts', 0); + + if ($message->getTimeToLive()) { + $message->setHeader('expires_at', time() + $message->getTimeToLive()); + } + + $this->getRedis()->lpush($this->queue->getName(), json_encode($message)); } } @@ -94,4 +128,15 @@ private function getRedis(): Redis { return $this->context->getRedis(); } + + private function initQueueConsumer(): void + { + if (null === $this->queueConsumer) { + $this->queueConsumer = new RedisQueueConsumer($this->getRedis(), [$this->queue]); + + if ($this->retryDelay) { + $this->queueConsumer->setRetryDelay($this->retryDelay); + } + } + } } diff --git a/pkg/redis/RedisContext.php b/pkg/redis/RedisContext.php index e69385884..2c3b468ea 100644 --- a/pkg/redis/RedisContext.php +++ b/pkg/redis/RedisContext.php @@ -78,7 +78,7 @@ public function deleteQueue(Queue $queue): void { InvalidDestinationException::assertDestinationInstanceOf($queue, RedisDestination::class); - $this->getRedis()->del($queue->getName()); + $this->deleteDestination($queue); } /** @@ -88,7 +88,7 @@ public function deleteTopic(Topic $topic): void { InvalidDestinationException::assertDestinationInstanceOf($topic, RedisDestination::class); - $this->getRedis()->del($topic->getName()); + $this->deleteDestination($topic); } public function createTemporaryQueue(): Queue @@ -129,7 +129,7 @@ public function createSubscriptionConsumer(): SubscriptionConsumer */ public function purgeQueue(Queue $queue): void { - $this->getRedis()->del($queue->getName()); + $this->deleteDestination($queue); } public function close(): void @@ -154,4 +154,11 @@ public function getRedis(): Redis return $this->redis; } + + private function deleteDestination(RedisDestination $destination): void + { + $this->getRedis()->del($destination->getName()); + $this->getRedis()->del($destination->getName().':delayed'); + $this->getRedis()->del($destination->getName().':reserved'); + } } diff --git a/pkg/redis/RedisMessage.php b/pkg/redis/RedisMessage.php index ab614f773..5b2359890 100644 --- a/pkg/redis/RedisMessage.php +++ b/pkg/redis/RedisMessage.php @@ -28,6 +28,16 @@ class RedisMessage implements Message, \JsonSerializable */ private $redelivered; + /** + * @var string + */ + private $reservedKey; + + /** + * @var string + */ + private $key; + public function __construct(string $body = '', array $properties = [], array $headers = []) { $this->body = $body; @@ -139,6 +149,75 @@ public function getReplyTo(): ?string return $this->getHeader('reply_to'); } + /** + * @return int + */ + public function getAttempts(): int + { + return (int) $this->getHeader('attempts', 0); + } + + /** + * @return int + */ + public function getTimeToLive(): ?int + { + return $this->getHeader('time_to_live'); + } + + /** + * Set time to live in milliseconds. + */ + public function setTimeToLive(int $timeToLive = null): void + { + $this->setHeader('time_to_live', $timeToLive); + } + + public function getDeliveryDelay(): ?int + { + return $this->getHeader('delivery_delay'); + } + + /** + * Set delay in milliseconds. + */ + public function setDeliveryDelay(int $deliveryDelay = null): void + { + $this->setHeader('delivery_delay', $deliveryDelay); + } + + /** + * @return string + */ + public function getReservedKey(): ?string + { + return $this->reservedKey; + } + + /** + * @param string $reservedKey + */ + public function setReservedKey(string $reservedKey) + { + $this->reservedKey = $reservedKey; + } + + /** + * @return string + */ + public function getKey(): ?string + { + return $this->key; + } + + /** + * @param string $key + */ + public function setKey(string $key) + { + $this->key = $key; + } + public function jsonSerialize(): array { return [ diff --git a/pkg/redis/RedisProducer.php b/pkg/redis/RedisProducer.php index 81ca13b7e..3b3b138cb 100644 --- a/pkg/redis/RedisProducer.php +++ b/pkg/redis/RedisProducer.php @@ -4,6 +4,7 @@ namespace Enqueue\Redis; +use Enqueue\Util\UUID; use Interop\Queue\Destination; use Interop\Queue\Exception\InvalidDestinationException; use Interop\Queue\Exception\InvalidMessageException; @@ -17,6 +18,16 @@ class RedisProducer implements Producer */ private $redis; + /** + * @var int|null + */ + private $timeToLive; + + /** + * @var int + */ + private $deliveryDelay; + /** * @param Redis $redis */ @@ -34,24 +45,42 @@ public function send(Destination $destination, Message $message): void InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class); InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class); - $this->redis->lpush($destination->getName(), json_encode($message)); + $message->setMessageId(UUID::generate()); + $message->setHeader('attempts', 0); + + if (null !== $this->timeToLive && null === $message->getTimeToLive()) { + $message->setTimeToLive($this->timeToLive); + } + + if (null !== $this->deliveryDelay && null === $message->getDeliveryDelay()) { + $message->setDeliveryDelay($this->deliveryDelay); + } + + if ($message->getTimeToLive()) { + $message->setHeader('expires_at', time() + $message->getTimeToLive()); + } + + if ($message->getDeliveryDelay()) { + $deliveryAt = time() + $message->getDeliveryDelay(); + $this->redis->zadd($destination->getName().':delayed', json_encode($message), $deliveryAt); + } else { + $this->redis->lpush($destination->getName(), json_encode($message)); + } } /** - * @return RedisProducer + * @return self */ public function setDeliveryDelay(int $deliveryDelay = null): Producer { - if (null === $deliveryDelay) { - return $this; - } + $this->deliveryDelay = $deliveryDelay; - throw new \LogicException('Not implemented'); + return $this; } public function getDeliveryDelay(): ?int { - return null; + return $this->deliveryDelay; } /** @@ -72,19 +101,17 @@ public function getPriority(): ?int } /** - * @return RedisProducer + * @return self */ public function setTimeToLive(int $timeToLive = null): Producer { - if (null === $timeToLive) { - return $this; - } + $this->timeToLive = $timeToLive; - throw new \LogicException('Not implemented'); + return $this; } public function getTimeToLive(): ?int { - return null; + return $this->timeToLive; } } diff --git a/pkg/redis/RedisQueueConsumer.php b/pkg/redis/RedisQueueConsumer.php new file mode 100644 index 000000000..26aa5fd7f --- /dev/null +++ b/pkg/redis/RedisQueueConsumer.php @@ -0,0 +1,148 @@ +redis = $redis; + $this->queues = $queues; + } + + /** + * @return int + */ + public function getRetryDelay(): int + { + return $this->retryDelay; + } + + /** + * @param int $retryDelay + */ + public function setRetryDelay(int $retryDelay): void + { + $this->retryDelay = $retryDelay; + } + + public function receiveMessage(int $timeout): ?RedisMessage + { + $startAt = time(); + $thisTimeout = $timeout; + + if (null === $this->queueNames) { + foreach ($this->queues as $queue) { + $this->queueNames[] = $queue->getName(); + } + } + + while ($thisTimeout > 0) { + $this->migrateExpiredMessages($this->queueNames); + + if ($result = $this->redis->brpop($this->queueNames, $thisTimeout)) { + $this->pushQueueNameBack($result->getKey()); + + if ($message = $this->processResult($result)) { + return $message; + } + + $thisTimeout -= time() - $startAt; + } + } + + return null; + } + + public function receiveMessageNoWait(RedisDestination $destination): ?RedisMessage + { + $this->migrateExpiredMessages([$destination->getName()]); + + if ($result = $this->redis->rpop($destination->getName())) { + return $this->processResult($result); + } + + return null; + } + + private function processResult(RedisResult $result): ?RedisMessage + { + $message = RedisMessage::jsonUnserialize($result->getMessage()); + + $now = time(); + + if ($expiresAt = $message->getHeader('expires_at')) { + if ($now > $expiresAt) { + return null; + } + } + + $message->setHeader('attempts', $message->getAttempts() + 1); + $message->setRedelivered($message->getAttempts() > 1); + $message->setReservedKey(json_encode($message)); + $message->setKey($result->getKey()); + + $reservedQueue = $result->getKey().':reserved'; + $retryMessageAt = $now + $this->retryDelay; + + $this->redis->zadd($reservedQueue, $message->getReservedKey(), $retryMessageAt); + + return $message; + } + + private function pushQueueNameBack($queueName): void + { + if (count($this->queueNames) <= 1) { + return; + } + + if (false === $from = array_search($queueName, $this->queueNames, true)) { + throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName)); + } + + $to = count($this->queueNames) - 1; + + $out = array_splice($this->queueNames, $from, 1); + array_splice($this->queueNames, $to, 0, $out); + } + + private function migrateExpiredMessages(array $queueNames): void + { + $now = time(); + + foreach ($queueNames as $queueName) { + $this->redis->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]); + $this->redis->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]); + } + } +} diff --git a/pkg/redis/RedisSubscriptionConsumer.php b/pkg/redis/RedisSubscriptionConsumer.php index 1b6cd1149..bf7504e6c 100644 --- a/pkg/redis/RedisSubscriptionConsumer.php +++ b/pkg/redis/RedisSubscriptionConsumer.php @@ -21,6 +21,11 @@ class RedisSubscriptionConsumer implements SubscriptionConsumer */ private $subscribers; + /** + * @var int + */ + private $retryDelay; + /** * @param RedisContext $context */ @@ -30,6 +35,22 @@ public function __construct(RedisContext $context) $this->subscribers = []; } + /** + * @return int + */ + public function getRetryDelay(): ?int + { + return $this->retryDelay; + } + + /** + * @param int $retryDelay + */ + public function setRetryDelay(int $retryDelay): void + { + $this->retryDelay = $retryDelay; + } + public function consume(int $timeout = 0): void { if (empty($this->subscribers)) { @@ -39,30 +60,23 @@ public function consume(int $timeout = 0): void $timeout = (int) ceil($timeout / 1000); $endAt = time() + $timeout; - $queueNames = []; - foreach (array_keys($this->subscribers) as $queueName) { - $queueNames[$queueName] = $queueName; + $queues = []; + /** @var Consumer $consumer */ + foreach ($this->subscribers as list($consumer)) { + $queues[] = $consumer->getQueue(); } - $currentQueueNames = []; - while (true) { - if (empty($currentQueueNames)) { - $currentQueueNames = $queueNames; - } + $queueConsumer = new RedisQueueConsumer($this->context->getRedis(), $queues); - $result = $this->context->getRedis()->brpop($currentQueueNames, $timeout ?: 5); - if ($result) { - $message = RedisMessage::jsonUnserialize($result->getMessage()); - list($consumer, $callback) = $this->subscribers[$result->getKey()]; - if (false === call_user_func($callback, $message, $consumer)) { - return; - } + if ($this->retryDelay) { + $queueConsumer->setRetryDelay($this->retryDelay); + } - unset($currentQueueNames[$result->getKey()]); - } else { - $currentQueueNames = []; + while (true) { + if ($message = $queueConsumer->receiveMessage($timeout ?: 5)) { + list($consumer, $callback) = $this->subscribers[$message->getKey()]; - if ($timeout && microtime(true) >= $endAt) { + if (false === call_user_func($callback, $message, $consumer)) { return; } } From a25f83efdcc43a163a12d91e0c0cea8a78515aa2 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 29 Oct 2018 12:04:45 +0200 Subject: [PATCH 2/7] redis new implementation --- pkg/redis/JsonSerializer.php | 41 +++++ pkg/redis/RedisConnectionFactory.php | 7 +- pkg/redis/RedisConsumer.php | 51 +++--- pkg/redis/RedisConsumerHelperTrait.php | 116 ++++++++++++++ pkg/redis/RedisContext.php | 25 ++- pkg/redis/RedisMessage.php | 27 +--- pkg/redis/RedisProducer.php | 16 +- pkg/redis/RedisQueueConsumer.php | 148 ------------------ pkg/redis/RedisSubscriptionConsumer.php | 27 ++-- pkg/redis/Serializer.php | 12 ++ pkg/redis/SerializerAwareTrait.php | 29 ++++ .../Tests/Functional/CommonUseCasesTrait.php | 7 +- pkg/redis/Tests/RedisProducerTest.php | 14 +- ...umerConsumeFromAllSubscribedQueuesTest.php | 2 +- 14 files changed, 286 insertions(+), 236 deletions(-) create mode 100644 pkg/redis/JsonSerializer.php create mode 100644 pkg/redis/RedisConsumerHelperTrait.php delete mode 100644 pkg/redis/RedisQueueConsumer.php create mode 100644 pkg/redis/Serializer.php create mode 100644 pkg/redis/SerializerAwareTrait.php diff --git a/pkg/redis/JsonSerializer.php b/pkg/redis/JsonSerializer.php new file mode 100644 index 000000000..7e064a221 --- /dev/null +++ b/pkg/redis/JsonSerializer.php @@ -0,0 +1,41 @@ + $message->getBody(), + 'properties' => $message->getProperties(), + 'headers' => $message->getHeaders(), + ]); + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return $json; + } + + public function toMessage(string $string): RedisMessage + { + $data = json_decode($string, true); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return new RedisMessage($data['body'], $data['properties'], $data['headers']); + } +} diff --git a/pkg/redis/RedisConnectionFactory.php b/pkg/redis/RedisConnectionFactory.php index 5d5ec3b40..eee9d4851 100644 --- a/pkg/redis/RedisConnectionFactory.php +++ b/pkg/redis/RedisConnectionFactory.php @@ -38,6 +38,8 @@ class RedisConnectionFactory implements ConnectionFactory * 'read_write_timeout' => Timeout (expressed in seconds) used when performing read or write operations on the underlying network resource after which an exception is thrown. * 'predis_options' => An array of predis specific options. * 'ssl' => could be any of http://fi2.php.net/manual/en/context.ssl.php#refsect1-context.ssl-options + * 'redelivery_delay' => Default 300 sec. Returns back message into the queue if message was not acknowledged or rejected after this delay. + * It could happen if consumer has failed with fatal error or even if message processing is slow and takes more than this time. * ]. * * or @@ -85,10 +87,10 @@ public function createContext(): Context if ($this->config['lazy']) { return new RedisContext(function () { return $this->createRedis(); - }); + }, $this->config['redelivery_delay']); } - return new RedisContext($this->createRedis()); + return new RedisContext($this->createRedis(), $this->config['redelivery_delay']); } private function createRedis(): Redis @@ -158,6 +160,7 @@ private function defaultConfig(): array 'read_write_timeout' => null, 'predis_options' => null, 'ssl' => null, + 'redelivery_delay' => 300, ]; } } diff --git a/pkg/redis/RedisConsumer.php b/pkg/redis/RedisConsumer.php index 4722d428d..6ff23f41e 100644 --- a/pkg/redis/RedisConsumer.php +++ b/pkg/redis/RedisConsumer.php @@ -11,6 +11,8 @@ class RedisConsumer implements Consumer { + use RedisConsumerHelperTrait; + /** * @var RedisDestination */ @@ -24,12 +26,7 @@ class RedisConsumer implements Consumer /** * @var int */ - private $retryDelay; - - /** - * @var RedisQueueConsumer - */ - private $queueConsumer; + private $redeliveryDelay = 300; public function __construct(RedisContext $context, RedisDestination $queue) { @@ -40,21 +37,17 @@ public function __construct(RedisContext $context, RedisDestination $queue) /** * @return int */ - public function getRetryDelay(): ?int + public function getRedeliveryDelay(): ?int { - return $this->retryDelay; + return $this->redeliveryDelay; } /** - * @param int $retryDelay + * @param int $delay */ - public function setRetryDelay(int $retryDelay): void + public function setRedeliveryDelay(int $delay): void { - $this->retryDelay = $retryDelay; - - if ($this->queueConsumer) { - $this->queueConsumer->setRetryDelay($this->retryDelay); - } + $this->redeliveryDelay = $delay; } /** @@ -80,9 +73,7 @@ public function receive(int $timeout = 0): ?Message } } - $this->initQueueConsumer(); - - return $this->queueConsumer->receiveMessage($timeout); + return $this->receiveMessage([$this->queue], $timeout, $this->redeliveryDelay); } /** @@ -90,9 +81,7 @@ public function receive(int $timeout = 0): ?Message */ public function receiveNoWait(): ?Message { - $this->initQueueConsumer(); - - return $this->queueConsumer->receiveMessageNoWait($this->queue); + return $this->receiveMessageNoWait($this->queue, $this->redeliveryDelay); } /** @@ -113,30 +102,26 @@ public function reject(Message $message, bool $requeue = false): void $this->acknowledge($message); if ($requeue) { - $message = RedisMessage::jsonUnserialize($message->getReservedKey()); + $message = $this->getContext()->getSerializer()->toMessage($message->getReservedKey()); $message->setHeader('attempts', 0); if ($message->getTimeToLive()) { $message->setHeader('expires_at', time() + $message->getTimeToLive()); } - $this->getRedis()->lpush($this->queue->getName(), json_encode($message)); + $payload = $this->getContext()->getSerializer()->toString($message); + + $this->getRedis()->lpush($this->queue->getName(), $payload); } } - private function getRedis(): Redis + private function getContext(): RedisContext { - return $this->context->getRedis(); + return $this->context; } - private function initQueueConsumer(): void + private function getRedis(): Redis { - if (null === $this->queueConsumer) { - $this->queueConsumer = new RedisQueueConsumer($this->getRedis(), [$this->queue]); - - if ($this->retryDelay) { - $this->queueConsumer->setRetryDelay($this->retryDelay); - } - } + return $this->context->getRedis(); } } diff --git a/pkg/redis/RedisConsumerHelperTrait.php b/pkg/redis/RedisConsumerHelperTrait.php new file mode 100644 index 000000000..41fab59ac --- /dev/null +++ b/pkg/redis/RedisConsumerHelperTrait.php @@ -0,0 +1,116 @@ +queueNames) { + $this->queueNames = []; + foreach ($queues as $queue) { + $this->queueNames[] = $queue->getName(); + } + } + + while ($thisTimeout > 0) { + $this->migrateExpiredMessages($this->queueNames); + + if ($result = $this->getContext()->getRedis()->brpop($this->queueNames, $thisTimeout)) { + $this->pushQueueNameBack($result->getKey()); + + if ($message = $this->processResult($result, $redeliveryDelay)) { + return $message; + } + } + + $thisTimeout -= time() - $startAt; + } + + return null; + } + + protected function receiveMessageNoWait(RedisDestination $destination, int $redeliveryDelay): ?RedisMessage + { + $this->migrateExpiredMessages([$destination->getName()]); + + if ($result = $this->getContext()->getRedis()->rpop($destination->getName())) { + return $this->processResult($result, $redeliveryDelay); + } + + return null; + } + + protected function processResult(RedisResult $result, int $redeliveryDelay): ?RedisMessage + { + $message = $this->getContext()->getSerializer()->toMessage($result->getMessage()); + + $now = time(); + + if ($expiresAt = $message->getHeader('expires_at')) { + if ($now > $expiresAt) { + return null; + } + } + + $message->setHeader('attempts', $message->getAttempts() + 1); + $message->setRedelivered($message->getAttempts() > 1); + $message->setKey($result->getKey()); + $message->setReservedKey($this->getContext()->getSerializer()->toString($message)); + + $reservedQueue = $result->getKey().':reserved'; + $redeliveryAt = $now + $redeliveryDelay; + + $this->getContext()->getRedis()->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt); + + return $message; + } + + protected function pushQueueNameBack(string $queueName): void + { + if (count($this->queueNames) <= 1) { + return; + } + + if (false === $from = array_search($queueName, $this->queueNames, true)) { + throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName)); + } + + $to = count($this->queueNames) - 1; + + $out = array_splice($this->queueNames, $from, 1); + array_splice($this->queueNames, $to, 0, $out); + } + + protected function migrateExpiredMessages(array $queueNames): void + { + $now = time(); + + foreach ($queueNames as $queueName) { + $this->getContext()->getRedis() + ->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]); + + $this->getContext()->getRedis() + ->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]); + } + } +} diff --git a/pkg/redis/RedisContext.php b/pkg/redis/RedisContext.php index 2c3b468ea..344bb20c5 100644 --- a/pkg/redis/RedisContext.php +++ b/pkg/redis/RedisContext.php @@ -17,6 +17,8 @@ class RedisContext implements Context { + use SerializerAwareTrait; + /** * @var Redis */ @@ -27,12 +29,18 @@ class RedisContext implements Context */ private $redisFactory; + /** + * @var int + */ + private $redeliveryDelay = 300; + /** * Callable must return instance of Redis once called. * * @param Redis|callable $redis + * @param int $redeliveryDelay */ - public function __construct($redis) + public function __construct($redis, int $redeliveryDelay) { if ($redis instanceof Redis) { $this->redis = $redis; @@ -45,6 +53,9 @@ public function __construct($redis) Redis::class )); } + + $this->redeliveryDelay = $redeliveryDelay; + $this->setSerializer(new JsonSerializer()); } /** @@ -101,7 +112,7 @@ public function createTemporaryQueue(): Queue */ public function createProducer(): Producer { - return new RedisProducer($this->getRedis()); + return new RedisProducer($this); } /** @@ -113,7 +124,10 @@ public function createConsumer(Destination $destination): Consumer { InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class); - return new RedisConsumer($this, $destination); + $consumer = new RedisConsumer($this, $destination); + $consumer->setRedeliveryDelay($this->redeliveryDelay); + + return $consumer; } /** @@ -121,7 +135,10 @@ public function createConsumer(Destination $destination): Consumer */ public function createSubscriptionConsumer(): SubscriptionConsumer { - return new RedisSubscriptionConsumer($this); + $consumer = new RedisSubscriptionConsumer($this); + $consumer->setRedeliveryDelay($this->redeliveryDelay); + + return $consumer; } /** diff --git a/pkg/redis/RedisMessage.php b/pkg/redis/RedisMessage.php index 5b2359890..74c65475a 100644 --- a/pkg/redis/RedisMessage.php +++ b/pkg/redis/RedisMessage.php @@ -6,7 +6,7 @@ use Interop\Queue\Message; -class RedisMessage implements Message, \JsonSerializable +class RedisMessage implements Message { /** * @var string @@ -213,31 +213,8 @@ public function getKey(): ?string /** * @param string $key */ - public function setKey(string $key) + public function setKey(string $key): void { $this->key = $key; } - - public function jsonSerialize(): array - { - return [ - 'body' => $this->getBody(), - 'properties' => $this->getProperties(), - 'headers' => $this->getHeaders(), - ]; - } - - public static function jsonUnserialize(string $json): self - { - $data = json_decode($json, true); - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'The malformed json given. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); - } - - return new self($data['body'], $data['properties'], $data['headers']); - } } diff --git a/pkg/redis/RedisProducer.php b/pkg/redis/RedisProducer.php index 3b3b138cb..5272615ce 100644 --- a/pkg/redis/RedisProducer.php +++ b/pkg/redis/RedisProducer.php @@ -14,9 +14,9 @@ class RedisProducer implements Producer { /** - * @var Redis + * @var RedisContext */ - private $redis; + private $context; /** * @var int|null @@ -29,11 +29,11 @@ class RedisProducer implements Producer private $deliveryDelay; /** - * @param Redis $redis + * @param RedisContext $context */ - public function __construct(Redis $redis) + public function __construct(RedisContext $context) { - $this->redis = $redis; + $this->context = $context; } /** @@ -60,11 +60,13 @@ public function send(Destination $destination, Message $message): void $message->setHeader('expires_at', time() + $message->getTimeToLive()); } + $payload = $this->context->getSerializer()->toString($message); + if ($message->getDeliveryDelay()) { $deliveryAt = time() + $message->getDeliveryDelay(); - $this->redis->zadd($destination->getName().':delayed', json_encode($message), $deliveryAt); + $this->context->getRedis()->zadd($destination->getName().':delayed', $payload, $deliveryAt); } else { - $this->redis->lpush($destination->getName(), json_encode($message)); + $this->context->getRedis()->lpush($destination->getName(), $payload); } } diff --git a/pkg/redis/RedisQueueConsumer.php b/pkg/redis/RedisQueueConsumer.php deleted file mode 100644 index 26aa5fd7f..000000000 --- a/pkg/redis/RedisQueueConsumer.php +++ /dev/null @@ -1,148 +0,0 @@ -redis = $redis; - $this->queues = $queues; - } - - /** - * @return int - */ - public function getRetryDelay(): int - { - return $this->retryDelay; - } - - /** - * @param int $retryDelay - */ - public function setRetryDelay(int $retryDelay): void - { - $this->retryDelay = $retryDelay; - } - - public function receiveMessage(int $timeout): ?RedisMessage - { - $startAt = time(); - $thisTimeout = $timeout; - - if (null === $this->queueNames) { - foreach ($this->queues as $queue) { - $this->queueNames[] = $queue->getName(); - } - } - - while ($thisTimeout > 0) { - $this->migrateExpiredMessages($this->queueNames); - - if ($result = $this->redis->brpop($this->queueNames, $thisTimeout)) { - $this->pushQueueNameBack($result->getKey()); - - if ($message = $this->processResult($result)) { - return $message; - } - - $thisTimeout -= time() - $startAt; - } - } - - return null; - } - - public function receiveMessageNoWait(RedisDestination $destination): ?RedisMessage - { - $this->migrateExpiredMessages([$destination->getName()]); - - if ($result = $this->redis->rpop($destination->getName())) { - return $this->processResult($result); - } - - return null; - } - - private function processResult(RedisResult $result): ?RedisMessage - { - $message = RedisMessage::jsonUnserialize($result->getMessage()); - - $now = time(); - - if ($expiresAt = $message->getHeader('expires_at')) { - if ($now > $expiresAt) { - return null; - } - } - - $message->setHeader('attempts', $message->getAttempts() + 1); - $message->setRedelivered($message->getAttempts() > 1); - $message->setReservedKey(json_encode($message)); - $message->setKey($result->getKey()); - - $reservedQueue = $result->getKey().':reserved'; - $retryMessageAt = $now + $this->retryDelay; - - $this->redis->zadd($reservedQueue, $message->getReservedKey(), $retryMessageAt); - - return $message; - } - - private function pushQueueNameBack($queueName): void - { - if (count($this->queueNames) <= 1) { - return; - } - - if (false === $from = array_search($queueName, $this->queueNames, true)) { - throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName)); - } - - $to = count($this->queueNames) - 1; - - $out = array_splice($this->queueNames, $from, 1); - array_splice($this->queueNames, $to, 0, $out); - } - - private function migrateExpiredMessages(array $queueNames): void - { - $now = time(); - - foreach ($queueNames as $queueName) { - $this->redis->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]); - $this->redis->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]); - } - } -} diff --git a/pkg/redis/RedisSubscriptionConsumer.php b/pkg/redis/RedisSubscriptionConsumer.php index bf7504e6c..36e6a17d4 100644 --- a/pkg/redis/RedisSubscriptionConsumer.php +++ b/pkg/redis/RedisSubscriptionConsumer.php @@ -9,6 +9,8 @@ class RedisSubscriptionConsumer implements SubscriptionConsumer { + use RedisConsumerHelperTrait; + /** * @var RedisContext */ @@ -24,7 +26,7 @@ class RedisSubscriptionConsumer implements SubscriptionConsumer /** * @var int */ - private $retryDelay; + private $redeliveryDelay = 300; /** * @param RedisContext $context @@ -38,17 +40,17 @@ public function __construct(RedisContext $context) /** * @return int */ - public function getRetryDelay(): ?int + public function getRedeliveryDelay(): ?int { - return $this->retryDelay; + return $this->redeliveryDelay; } /** - * @param int $retryDelay + * @param int $delay */ - public function setRetryDelay(int $retryDelay): void + public function setRedeliveryDelay(int $delay): void { - $this->retryDelay = $retryDelay; + $this->redeliveryDelay = $delay; } public function consume(int $timeout = 0): void @@ -66,14 +68,8 @@ public function consume(int $timeout = 0): void $queues[] = $consumer->getQueue(); } - $queueConsumer = new RedisQueueConsumer($this->context->getRedis(), $queues); - - if ($this->retryDelay) { - $queueConsumer->setRetryDelay($this->retryDelay); - } - while (true) { - if ($message = $queueConsumer->receiveMessage($timeout ?: 5)) { + if ($message = $this->receiveMessage($queues, $timeout ?: 5, $this->redeliveryDelay)) { list($consumer, $callback) = $this->subscribers[$message->getKey()]; if (false === call_user_func($callback, $message, $consumer)) { @@ -134,4 +130,9 @@ public function unsubscribeAll(): void { $this->subscribers = []; } + + private function getContext(): RedisContext + { + return $this->context; + } } diff --git a/pkg/redis/Serializer.php b/pkg/redis/Serializer.php new file mode 100644 index 000000000..a936a9328 --- /dev/null +++ b/pkg/redis/Serializer.php @@ -0,0 +1,12 @@ +serializer = $serializer; + } + + /** + * @return Serializer + */ + public function getSerializer() + { + return $this->serializer; + } +} diff --git a/pkg/redis/Tests/Functional/CommonUseCasesTrait.php b/pkg/redis/Tests/Functional/CommonUseCasesTrait.php index 02f63d6f4..b80ea9763 100644 --- a/pkg/redis/Tests/Functional/CommonUseCasesTrait.php +++ b/pkg/redis/Tests/Functional/CommonUseCasesTrait.php @@ -61,7 +61,10 @@ public function testProduceAndReceiveOneMessageSentDirectlyToQueue() $this->assertEquals(__METHOD__, $message->getBody()); $this->assertEquals(['FooProperty' => 'FooVal'], $message->getProperties()); - $this->assertEquals(['BarHeader' => 'BarVal'], $message->getHeaders()); + $this->assertCount(3, $message->getHeaders()); + $this->assertSame(1, $message->getHeader('attempts')); + $this->assertSame('BarVal', $message->getHeader('BarHeader')); + $this->assertNotEmpty('BarVal', $message->getHeader('message_id')); } public function testProduceAndReceiveOneMessageSentDirectlyToTopic() @@ -99,7 +102,7 @@ public function testConsumerReceiveMessageWithZeroTimeout() $actualMessage = $consumer->receive(0); $this->assertInstanceOf(RedisMessage::class, $actualMessage); - $consumer->acknowledge($message); + $consumer->acknowledge($actualMessage); $this->assertEquals(__METHOD__, $message->getBody()); } diff --git a/pkg/redis/Tests/RedisProducerTest.php b/pkg/redis/Tests/RedisProducerTest.php index 98934826c..fe45bf5ec 100644 --- a/pkg/redis/Tests/RedisProducerTest.php +++ b/pkg/redis/Tests/RedisProducerTest.php @@ -54,7 +54,19 @@ public function testShouldCallLPushOnSend() $redisMock ->expects($this->once()) ->method('lpush') - ->with('aDestination', '{"body":"","properties":[],"headers":[]}') + ->willReturnCallback(function (string $key, string $value) { + $this->assertSame('aDestination', $key); + + $message = json_decode($value, true); + + $this->assertArrayHasKey('body', $message); + $this->assertArrayHasKey('properties', $message); + $this->assertArrayHasKey('headers', $message); + $this->assertNotEmpty($message['headers']['message_id']); + $this->assertSame(0, $message['headers']['attempts']); + + return true; + }) ; $producer = new RedisProducer($redisMock); diff --git a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php index 21d514fa5..16dbd8f98 100644 --- a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php +++ b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php @@ -35,7 +35,7 @@ protected function createQueue(Context $context, $queueName) { /** @var RedisDestination $queue */ $queue = parent::createQueue($context, $queueName); - $context->getRedis()->del($queueName); + $context->deleteQueue($queue); return $queue; } From b626b116f9ed596fc63a0971c2c38278773d94ea Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 29 Oct 2018 15:22:24 +0200 Subject: [PATCH 3/7] redis new implementation --- pkg/redis/RedisConsumerHelperTrait.php | 12 +- pkg/redis/RedisSubscriptionConsumer.php | 3 + .../RedisConnectionFactoryConfigTest.php | 12 ++ pkg/redis/Tests/RedisConsumerTest.php | 129 ++++++++++++++---- pkg/redis/Tests/RedisContextTest.php | 58 +++++--- pkg/redis/Tests/RedisMessageTest.php | 35 ----- pkg/redis/Tests/RedisProducerTest.php | 30 +++- pkg/redis/Tests/Spec/JsonSerializerTest.php | 76 +++++++++++ ...onConsumerConsumeUntilUnsubscribedTest.php | 2 +- 9 files changed, 265 insertions(+), 92 deletions(-) create mode 100644 pkg/redis/Tests/Spec/JsonSerializerTest.php diff --git a/pkg/redis/RedisConsumerHelperTrait.php b/pkg/redis/RedisConsumerHelperTrait.php index 41fab59ac..35081f549 100644 --- a/pkg/redis/RedisConsumerHelperTrait.php +++ b/pkg/redis/RedisConsumerHelperTrait.php @@ -35,12 +35,14 @@ protected function receiveMessage(array $queues, int $timeout, int $redeliveryDe while ($thisTimeout > 0) { $this->migrateExpiredMessages($this->queueNames); - if ($result = $this->getContext()->getRedis()->brpop($this->queueNames, $thisTimeout)) { - $this->pushQueueNameBack($result->getKey()); + if (false == $result = $this->getContext()->getRedis()->brpop($this->queueNames, $thisTimeout)) { + return null; + } + + $this->pushQueueNameBack($result->getKey()); - if ($message = $this->processResult($result, $redeliveryDelay)) { - return $message; - } + if ($message = $this->processResult($result, $redeliveryDelay)) { + return $message; } $thisTimeout -= time() - $startAt; diff --git a/pkg/redis/RedisSubscriptionConsumer.php b/pkg/redis/RedisSubscriptionConsumer.php index 36e6a17d4..c59cab4da 100644 --- a/pkg/redis/RedisSubscriptionConsumer.php +++ b/pkg/redis/RedisSubscriptionConsumer.php @@ -102,6 +102,7 @@ public function subscribe(Consumer $consumer, callable $callback): void } $this->subscribers[$queueName] = [$consumer, $callback]; + $this->queueNames = null; } /** @@ -124,11 +125,13 @@ public function unsubscribe(Consumer $consumer): void } unset($this->subscribers[$queueName]); + $this->queueNames = null; } public function unsubscribeAll(): void { $this->subscribers = []; + $this->queueNames = null; } private function getContext(): RedisContext diff --git a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php index 953006af9..897d5ee18 100644 --- a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php +++ b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php @@ -90,6 +90,7 @@ public static function provideConfigs() 'read_write_timeout' => null, 'predis_options' => null, 'ssl' => null, + 'redelivery_delay' => 300, ], ]; @@ -110,6 +111,7 @@ public static function provideConfigs() 'read_write_timeout' => null, 'predis_options' => null, 'ssl' => null, + 'redelivery_delay' => 300, ], ]; @@ -130,6 +132,7 @@ public static function provideConfigs() 'read_write_timeout' => null, 'predis_options' => null, 'ssl' => null, + 'redelivery_delay' => 300, ], ]; @@ -151,6 +154,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -172,6 +176,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -193,6 +198,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -215,6 +221,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -237,6 +244,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -259,6 +267,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -280,6 +289,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -301,6 +311,7 @@ public static function provideConfigs() 'read_write_timeout' => null, 'predis_options' => null, 'ssl' => null, + 'redelivery_delay' => 300, ], ]; @@ -326,6 +337,7 @@ public static function provideConfigs() 'cafile' => 'private.pem', 'verify_peer' => '1', ], + 'redelivery_delay' => 300, ], ]; } diff --git a/pkg/redis/Tests/RedisConsumerTest.php b/pkg/redis/Tests/RedisConsumerTest.php index d8b0c4e7b..8c2b1afe0 100644 --- a/pkg/redis/Tests/RedisConsumerTest.php +++ b/pkg/redis/Tests/RedisConsumerTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Redis\Tests; +use Enqueue\Redis\JsonSerializer; use Enqueue\Redis\Redis; use Enqueue\Redis\RedisConsumer; use Enqueue\Redis\RedisContext; @@ -35,41 +36,85 @@ public function testShouldReturnDestinationSetInConstructorOnGetQueue() $this->assertSame($destination, $consumer->getQueue()); } - public function testShouldDoNothingOnAcknowledge() + public function testShouldAcknowledgeMessage() { - $consumer = new RedisConsumer($this->createContextMock(), new RedisDestination('aQueue')); + $redisMock = $this->createRedisMock(); + $redisMock + ->expects($this->once()) + ->method('zrem') + ->with('aQueue:reserved', 'reserved-key') + ->willReturn(1) + ; - $consumer->acknowledge(new RedisMessage()); - } + $contextMock = $this->createContextMock(); + $contextMock + ->expects($this->once()) + ->method('getRedis') + ->willReturn($redisMock) + ; - public function testShouldDoNothingOnReject() - { - $consumer = new RedisConsumer($this->createContextMock(), new RedisDestination('aQueue')); + $message = new RedisMessage(); + $message->setReservedKey('reserved-key'); + + $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue')); - $consumer->reject(new RedisMessage()); + $consumer->acknowledge($message); } - public function testShouldSendSameMessageToDestinationOnReQueue() + public function testShouldRejectMessage() { + $redisMock = $this->createRedisMock(); + $redisMock + ->expects($this->once()) + ->method('zrem') + ->with('aQueue:reserved', 'reserved-key') + ->willReturn(1) + ; + + $contextMock = $this->createContextMock(); + $contextMock + ->expects($this->once()) + ->method('getRedis') + ->willReturn($redisMock) + ; + $message = new RedisMessage(); + $message->setReservedKey('reserved-key'); - $destination = new RedisDestination('aQueue'); + $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue')); - $producerMock = $this->createProducerMock(); - $producerMock + $consumer->reject($message); + } + + public function testShouldSendSameMessageToDestinationOnReQueue() + { + $redisMock = $this->createRedisMock(); + $redisMock ->expects($this->once()) - ->method('send') - ->with($this->identicalTo($destination), $this->identicalTo($message)) + ->method('lpush') + ->with('aQueue', '{"body":"text","properties":[],"headers":{"attempts":0}}') + ->willReturn(1) ; + $serializer = new JsonSerializer(); + $contextMock = $this->createContextMock(); $contextMock - ->expects($this->once()) - ->method('createProducer') - ->willReturn($producerMock) + ->expects($this->any()) + ->method('getRedis') + ->willReturn($redisMock) + ; + $contextMock + ->expects($this->any()) + ->method('getSerializer') + ->willReturn($serializer) ; - $consumer = new RedisConsumer($contextMock, $destination); + $message = new RedisMessage(); + $message->setBody('text'); + $message->setReservedKey($serializer->toString($message)); + + $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue')); $consumer->reject($message, true); } @@ -88,7 +133,7 @@ public function testShouldCallRedisBRPopAndReturnNullIfNothingInQueueOnReceive() $contextMock = $this->createContextMock(); $contextMock - ->expects($this->once()) + ->expects($this->any()) ->method('getRedis') ->willReturn($redisMock) ; @@ -102,20 +147,27 @@ public function testShouldCallRedisBRPopAndReturnMessageIfOneInQueueOnReceive() { $destination = new RedisDestination('aQueue'); + $serializer = new JsonSerializer(); + $redisMock = $this->createRedisMock(); $redisMock ->expects($this->once()) ->method('brpop') ->with(['aQueue'], 2) - ->willReturn(new RedisResult('aQueue', json_encode(new RedisMessage('aBody')))) + ->willReturn(new RedisResult('aQueue', $serializer->toString(new RedisMessage('aBody')))) ; $contextMock = $this->createContextMock(); $contextMock - ->expects($this->once()) + ->expects($this->any()) ->method('getRedis') ->willReturn($redisMock) ; + $contextMock + ->expects($this->any()) + ->method('getSerializer') + ->willReturn($serializer) + ; $consumer = new RedisConsumer($contextMock, $destination); @@ -131,24 +183,26 @@ public function testShouldCallRedisBRPopSeveralTimesWithFiveSecondTimeoutIfZeroT $expectedTimeout = 5; + $serializer = new JsonSerializer(); + $redisMock = $this->createRedisMock(); $redisMock - ->expects($this->at(0)) + ->expects($this->at(2)) ->method('brpop') ->with(['aQueue'], $expectedTimeout) ->willReturn(null) ; $redisMock - ->expects($this->at(1)) + ->expects($this->at(5)) ->method('brpop') ->with(['aQueue'], $expectedTimeout) ->willReturn(null) ; $redisMock - ->expects($this->at(2)) + ->expects($this->at(8)) ->method('brpop') ->with(['aQueue'], $expectedTimeout) - ->willReturn(new RedisResult('aQueue', json_encode(new RedisMessage('aBody')))) + ->willReturn(new RedisResult('aQueue', $serializer->toString(new RedisMessage('aBody')))) ; $contextMock = $this->createContextMock(); @@ -157,6 +211,11 @@ public function testShouldCallRedisBRPopSeveralTimesWithFiveSecondTimeoutIfZeroT ->method('getRedis') ->willReturn($redisMock) ; + $contextMock + ->expects($this->atLeastOnce()) + ->method('getSerializer') + ->willReturn($serializer) + ; $consumer = new RedisConsumer($contextMock, $destination); @@ -170,6 +229,8 @@ public function testShouldCallRedisRPopAndReturnNullIfNothingInQueueOnReceiveNoW { $destination = new RedisDestination('aQueue'); + $serializer = new JsonSerializer(); + $redisMock = $this->createRedisMock(); $redisMock ->expects($this->once()) @@ -180,10 +241,15 @@ public function testShouldCallRedisRPopAndReturnNullIfNothingInQueueOnReceiveNoW $contextMock = $this->createContextMock(); $contextMock - ->expects($this->once()) + ->expects($this->any()) ->method('getRedis') ->willReturn($redisMock) ; + $contextMock + ->expects($this->any()) + ->method('getSerializer') + ->willReturn($serializer) + ; $consumer = new RedisConsumer($contextMock, $destination); @@ -194,20 +260,27 @@ public function testShouldCallRedisRPopAndReturnMessageIfOneInQueueOnReceiveNoWa { $destination = new RedisDestination('aQueue'); + $serializer = new JsonSerializer(); + $redisMock = $this->createRedisMock(); $redisMock ->expects($this->once()) ->method('rpop') ->with('aQueue') - ->willReturn(new RedisResult('aQueue', json_encode(new RedisMessage('aBody')))) + ->willReturn(new RedisResult('aQueue', $serializer->toString(new RedisMessage('aBody')))) ; $contextMock = $this->createContextMock(); $contextMock - ->expects($this->once()) + ->expects($this->atLeastOnce()) ->method('getRedis') ->willReturn($redisMock) ; + $contextMock + ->expects($this->any()) + ->method('getSerializer') + ->willReturn($serializer) + ; $consumer = new RedisConsumer($contextMock, $destination); diff --git a/pkg/redis/Tests/RedisContextTest.php b/pkg/redis/Tests/RedisContextTest.php index 94fb626f1..01ea67df5 100644 --- a/pkg/redis/Tests/RedisContextTest.php +++ b/pkg/redis/Tests/RedisContextTest.php @@ -27,26 +27,26 @@ public function testShouldImplementContextInterface() public function testCouldBeConstructedWithRedisAsFirstArgument() { - new RedisContext($this->createRedisMock()); + new RedisContext($this->createRedisMock(), 300); } public function testCouldBeConstructedWithRedisFactoryAsFirstArgument() { new RedisContext(function () { return $this->createRedisMock(); - }); + }, 300); } public function testThrowIfNeitherRedisNorFactoryGiven() { $this->expectException(\InvalidArgumentException::class); $this->expectExceptionMessage('The $redis argument must be either Enqueue\Redis\Redis or callable that returns Enqueue\Redis\Redis once called.'); - new RedisContext(new \stdClass()); + new RedisContext(new \stdClass(), 300); } public function testShouldAllowCreateEmptyMessage() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $message = $context->createMessage(); @@ -59,7 +59,7 @@ public function testShouldAllowCreateEmptyMessage() public function testShouldAllowCreateCustomMessage() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $message = $context->createMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); @@ -72,7 +72,7 @@ public function testShouldAllowCreateCustomMessage() public function testShouldCreateQueue() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $queue = $context->createQueue('aQueue'); @@ -82,7 +82,7 @@ public function testShouldCreateQueue() public function testShouldAllowCreateTopic() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $topic = $context->createTopic('aTopic'); @@ -92,7 +92,7 @@ public function testShouldAllowCreateTopic() public function testThrowNotImplementedOnCreateTmpQueueCall() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $this->expectException(TemporaryQueueNotSupportedException::class); @@ -101,7 +101,7 @@ public function testThrowNotImplementedOnCreateTmpQueueCall() public function testShouldCreateProducer() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $producer = $context->createProducer(); @@ -110,7 +110,7 @@ public function testShouldCreateProducer() public function testShouldThrowIfNotRedisDestinationGivenOnCreateConsumer() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Enqueue\Redis\RedisDestination but got Enqueue\Null\NullQueue.'); @@ -121,7 +121,7 @@ public function testShouldThrowIfNotRedisDestinationGivenOnCreateConsumer() public function testShouldCreateConsumer() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $queue = $context->createQueue('aQueue'); @@ -138,7 +138,7 @@ public function testShouldCallRedisDisconnectOnClose() ->method('disconnect') ; - $context = new RedisContext($redisMock); + $context = new RedisContext($redisMock, 300); $context->close(); } @@ -151,7 +151,7 @@ public function testThrowIfNotRedisDestinationGivenOnDeleteQueue() ->method('del') ; - $context = new RedisContext($redisMock); + $context = new RedisContext($redisMock, 300); $this->expectException(InvalidDestinationException::class); $context->deleteQueue(new NullQueue('aQueue')); @@ -161,12 +161,22 @@ public function testShouldAllowDeleteQueue() { $redisMock = $this->createRedisMock(); $redisMock - ->expects($this->once()) + ->expects($this->at(0)) ->method('del') ->with('aQueueName') ; + $redisMock + ->expects($this->at(1)) + ->method('del') + ->with('aQueueName:delayed') + ; + $redisMock + ->expects($this->at(2)) + ->method('del') + ->with('aQueueName:reserved') + ; - $context = new RedisContext($redisMock); + $context = new RedisContext($redisMock, 300); $queue = $context->createQueue('aQueueName'); @@ -181,7 +191,7 @@ public function testThrowIfNotRedisDestinationGivenOnDeleteTopic() ->method('del') ; - $context = new RedisContext($redisMock); + $context = new RedisContext($redisMock, 300); $this->expectException(InvalidDestinationException::class); $context->deleteTopic(new NullTopic('aTopic')); @@ -191,12 +201,22 @@ public function testShouldAllowDeleteTopic() { $redisMock = $this->createRedisMock(); $redisMock - ->expects($this->once()) + ->expects($this->at(0)) ->method('del') ->with('aTopicName') ; + $redisMock + ->expects($this->at(1)) + ->method('del') + ->with('aTopicName:delayed') + ; + $redisMock + ->expects($this->at(2)) + ->method('del') + ->with('aTopicName:reserved') + ; - $context = new RedisContext($redisMock); + $context = new RedisContext($redisMock, 300); $topic = $context->createTopic('aTopicName'); @@ -205,7 +225,7 @@ public function testShouldAllowDeleteTopic() public function testShouldReturnExpectedSubscriptionConsumerInstance() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $this->assertInstanceOf(RedisSubscriptionConsumer::class, $context->createSubscriptionConsumer()); } diff --git a/pkg/redis/Tests/RedisMessageTest.php b/pkg/redis/Tests/RedisMessageTest.php index 2a81efa5a..5b1e42fe2 100644 --- a/pkg/redis/Tests/RedisMessageTest.php +++ b/pkg/redis/Tests/RedisMessageTest.php @@ -9,11 +9,6 @@ class RedisMessageTest extends \PHPUnit\Framework\TestCase { use ClassExtensionTrait; - public function testShouldImplementJsonSerializableInterface() - { - $this->assertClassImplements(\JsonSerializable::class, RedisMessage::class); - } - public function testCouldConstructMessageWithoutArguments() { $message = new RedisMessage(); @@ -63,34 +58,4 @@ public function testShouldSetReplyToAsHeader() $this->assertSame(['reply_to' => 'theQueueName'], $message->getHeaders()); } - - public function testColdBeSerializedToJson() - { - $message = new RedisMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']); - - $this->assertEquals('{"body":"theBody","properties":{"thePropFoo":"thePropFooVal"},"headers":{"theHeaderFoo":"theHeaderFooVal"}}', json_encode($message)); - } - - public function testCouldBeUnserializedFromJson() - { - $message = new RedisMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']); - - $json = json_encode($message); - - //guard - $this->assertNotEmpty($json); - - $unserializedMessage = RedisMessage::jsonUnserialize($json); - - $this->assertInstanceOf(RedisMessage::class, $unserializedMessage); - $this->assertEquals($message, $unserializedMessage); - } - - public function testThrowIfMalformedJsonGivenOnUnsterilizedFromJson() - { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The malformed json given.'); - - RedisMessage::jsonUnserialize('{]'); - } } diff --git a/pkg/redis/Tests/RedisProducerTest.php b/pkg/redis/Tests/RedisProducerTest.php index fe45bf5ec..12082734c 100644 --- a/pkg/redis/Tests/RedisProducerTest.php +++ b/pkg/redis/Tests/RedisProducerTest.php @@ -4,7 +4,9 @@ use Enqueue\Null\NullMessage; use Enqueue\Null\NullQueue; +use Enqueue\Redis\JsonSerializer; use Enqueue\Redis\Redis; +use Enqueue\Redis\RedisContext; use Enqueue\Redis\RedisDestination; use Enqueue\Redis\RedisMessage; use Enqueue\Redis\RedisProducer; @@ -25,12 +27,12 @@ public function testShouldImplementProducerInterface() public function testCouldBeConstructedWithRedisAsFirstArgument() { - new RedisProducer($this->createRedisMock()); + new RedisProducer($this->createContextMock()); } public function testThrowIfDestinationNotRedisDestinationOnSend() { - $producer = new RedisProducer($this->createRedisMock()); + $producer = new RedisProducer($this->createContextMock()); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Enqueue\Redis\RedisDestination but got Enqueue\Null\NullQueue.'); @@ -39,7 +41,7 @@ public function testThrowIfDestinationNotRedisDestinationOnSend() public function testThrowIfMessageNotRedisMessageOnSend() { - $producer = new RedisProducer($this->createRedisMock()); + $producer = new RedisProducer($this->createContextMock()); $this->expectException(InvalidMessageException::class); $this->expectExceptionMessage('The message must be an instance of Enqueue\Redis\RedisMessage but it is Enqueue\Null\NullMessage.'); @@ -69,11 +71,31 @@ public function testShouldCallLPushOnSend() }) ; - $producer = new RedisProducer($redisMock); + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getRedis') + ->willReturn($redisMock) + ; + $context + ->expects($this->once()) + ->method('getSerializer') + ->willReturn(new JsonSerializer()) + ; + + $producer = new RedisProducer($context); $producer->send($destination, new RedisMessage()); } + /** + * @return \PHPUnit_Framework_MockObject_MockObject|RedisContext + */ + private function createContextMock() + { + return $this->createMock(RedisContext::class); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|Redis */ diff --git a/pkg/redis/Tests/Spec/JsonSerializerTest.php b/pkg/redis/Tests/Spec/JsonSerializerTest.php new file mode 100644 index 000000000..57d1b765f --- /dev/null +++ b/pkg/redis/Tests/Spec/JsonSerializerTest.php @@ -0,0 +1,76 @@ +assertClassImplements(Serializer::class, JsonSerializer::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new JsonSerializer(); + } + + public function testShouldConvertMessageToJsonString() + { + $serializer = new JsonSerializer(); + + $message = new RedisMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); + + $json = $serializer->toString($message); + + $this->assertSame('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}', $json); + } + + public function testThrowIfFailedToEncodeMessageToJson() + { + $serializer = new JsonSerializer(); + + $resource = fopen(__FILE__, 'rb'); + + //guard + $this->assertInternalType('resource', $resource); + + $message = new RedisMessage('theBody', ['aProp' => $resource]); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given.'); + $serializer->toString($message); + } + + public function testShouldConvertJsonStringToMessage() + { + $serializer = new JsonSerializer(); + + $message = $serializer->toMessage('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}'); + + $this->assertInstanceOf(RedisMessage::class, $message); + + $this->assertSame('theBody', $message->getBody()); + $this->assertSame(['aProp' => 'aPropVal'], $message->getProperties()); + $this->assertSame(['aHeader' => 'aHeaderVal'], $message->getHeaders()); + } + + public function testThrowIfFailedToDecodeJsonToMessage() + { + $serializer = new JsonSerializer(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given.'); + $serializer->toMessage('{]'); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php index 9524e5b6f..b227e3405 100644 --- a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php +++ b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php @@ -35,7 +35,7 @@ protected function createQueue(Context $context, $queueName) { /** @var RedisDestination $queue */ $queue = parent::createQueue($context, $queueName); - $context->getRedis()->del($queueName); + $context->purgeQueue($queue); return $queue; } From 50379c7d6d94eecefa7813b2c7afc2f64ccfd535 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 29 Oct 2018 15:37:40 +0200 Subject: [PATCH 4/7] redis new implementation --- pkg/redis/Redis.php | 6 +++--- pkg/redis/RedisConsumerHelperTrait.php | 2 +- pkg/redis/Tests/Spec/JsonSerializerTest.php | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/redis/Redis.php b/pkg/redis/Redis.php index 1baa4d909..362f5e8d0 100644 --- a/pkg/redis/Redis.php +++ b/pkg/redis/Redis.php @@ -8,8 +8,8 @@ interface Redis { /** * @param string $script - * @param array $keys - * @param array $args + * @param array $keys + * @param array $args * * @throws ServerException * @@ -20,7 +20,7 @@ public function eval(string $script, array $keys = [], array $args = []); /** * @param string $key * @param string $value - * @param float $score + * @param float $score * * @throws ServerException * diff --git a/pkg/redis/RedisConsumerHelperTrait.php b/pkg/redis/RedisConsumerHelperTrait.php index 35081f549..7f1836d6d 100644 --- a/pkg/redis/RedisConsumerHelperTrait.php +++ b/pkg/redis/RedisConsumerHelperTrait.php @@ -11,7 +11,7 @@ trait RedisConsumerHelperTrait */ protected $queueNames; - abstract function getContext(): RedisContext; + abstract protected function getContext(): RedisContext; /** * @param RedisDestination[] $queues diff --git a/pkg/redis/Tests/Spec/JsonSerializerTest.php b/pkg/redis/Tests/Spec/JsonSerializerTest.php index 57d1b765f..fd244b17c 100644 --- a/pkg/redis/Tests/Spec/JsonSerializerTest.php +++ b/pkg/redis/Tests/Spec/JsonSerializerTest.php @@ -2,10 +2,10 @@ namespace Enqueue\Redis\Tests\Spec; -use Enqueue\Test\ClassExtensionTrait; use Enqueue\Redis\JsonSerializer; -use Enqueue\Redis\Serializer; use Enqueue\Redis\RedisMessage; +use Enqueue\Redis\Serializer; +use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; /** From 7716a8272b35dd46b7f05acdbf94c8e1868963fb Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 29 Oct 2018 16:01:42 +0200 Subject: [PATCH 5/7] redis new implementation --- docs/transport/redis.md | 47 +++++++++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/docs/transport/redis.md b/docs/transport/redis.md index 8a51c61cc..ff5a45990 100644 --- a/docs/transport/redis.md +++ b/docs/transport/redis.md @@ -5,12 +5,12 @@ It creates a collection (a queue or topic) there. Pushes messages to the tail of The transport works with [phpredis](https://github.com/phpredis/phpredis) php extension or [predis](https://github.com/nrk/predis) library. Make sure you installed either of them -**Limitations** It works only in auto ack mode hence If consumer crashes the message is lost. - * [Installation](#installation) * [Create context](#create-context) * [Send message to topic](#send-message-to-topic) * [Send message to queue](#send-message-to-queue) +* [Send expiration message](#send-expiration-message) +* [Send delayed message](#send-delayed-message) * [Consume message](#consume-message) * [Delete queue (purge messages)](#delete-queue-purge-messages) * [Delete topic (purge messages)](#delete-topic-purge-messages) @@ -56,7 +56,7 @@ $factory = new RedisConnectionFactory([ ]); // same as above but given as DSN string -$factory = new RedisConnectionFactory('redis://example.com:1000?vendor=phpredis'); +$factory = new RedisConnectionFactory('redis+phpredis://example.com:1000'); $psrContext = $factory->createContext(); @@ -68,7 +68,7 @@ $redis = new \Enqueue\Redis\PhpRedis([ /** redis connection options */ ]); $redis->connect(); // Secure\TLS connection. Works only with predis library. Note second "S" in scheme. -$factory = new RedisConnectionFactory('rediss://user:pass@host/0?vendor=predis'); +$factory = new RedisConnectionFactory('rediss+predis://user:pass@host/0'); $factory = new RedisConnectionFactory($redis); ``` @@ -82,7 +82,7 @@ use Enqueue\Redis\RedisConnectionFactory; $connectionFactory = new RedisConnectionFactory([ 'host' => 'localhost', 'port' => 6379, - 'vendor' => 'predis', + 'scheme_extensions' => 'predis', ]); $psrContext = $connectionFactory->createContext(); @@ -102,7 +102,7 @@ $options = []; $redis = new PRedis(new \PRedis\Client($config, $options)); -$factory = new RedisConnectionFactory(['vendor' => 'custom', 'redis' => $redis]); +$factory = new RedisConnectionFactory($redis); ``` ## Send message to topic @@ -129,6 +129,38 @@ $message = $psrContext->createMessage('Hello world!'); $psrContext->createProducer()->send($fooQueue, $message); ``` +## Send expiration message + +```php +createMessage('Hello world!'); + +$psrContext->createProducer() + ->setTimeToLive(60000) // 60 sec + // + ->send($fooQueue, $message) +; +``` + +## Send delayed message + +```php +createMessage('Hello world!'); + +$psrContext->createProducer() + ->setDeliveryDelay(5000) // 5 sec + + ->send($fooQueue, $message) +; +```` + ## Consume message: ```php @@ -141,6 +173,9 @@ $consumer = $psrContext->createConsumer($fooQueue); $message = $consumer->receive(); // process a message + +$consumer->acknowledge($message); +//$consumer->reject($message); ``` ## Delete queue (purge messages): From 3681e17e468af2f847de256c42e1c05673f7cbc4 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 29 Oct 2018 16:17:41 +0200 Subject: [PATCH 6/7] redis new implementation --- pkg/redis/RedisConsumerHelperTrait.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/redis/RedisConsumerHelperTrait.php b/pkg/redis/RedisConsumerHelperTrait.php index 7f1836d6d..9939986ed 100644 --- a/pkg/redis/RedisConsumerHelperTrait.php +++ b/pkg/redis/RedisConsumerHelperTrait.php @@ -68,7 +68,7 @@ protected function processResult(RedisResult $result, int $redeliveryDelay): ?Re $now = time(); - if ($expiresAt = $message->getHeader('expires_at')) { + if (0 === $message->getAttempts() && $expiresAt = $message->getHeader('expires_at')) { if ($now > $expiresAt) { return null; } From 5615d56bdf33d03097b3482617ec1b757cb63568 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 30 Oct 2018 10:10:16 +0200 Subject: [PATCH 7/7] redis new implementation --- pkg/redis/RedisProducer.php | 4 ++-- pkg/redis/composer.json | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/redis/RedisProducer.php b/pkg/redis/RedisProducer.php index 5272615ce..69b028cb1 100644 --- a/pkg/redis/RedisProducer.php +++ b/pkg/redis/RedisProducer.php @@ -4,12 +4,12 @@ namespace Enqueue\Redis; -use Enqueue\Util\UUID; use Interop\Queue\Destination; use Interop\Queue\Exception\InvalidDestinationException; use Interop\Queue\Exception\InvalidMessageException; use Interop\Queue\Message; use Interop\Queue\Producer; +use Ramsey\Uuid\Uuid; class RedisProducer implements Producer { @@ -45,7 +45,7 @@ public function send(Destination $destination, Message $message): void InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class); InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class); - $message->setMessageId(UUID::generate()); + $message->setMessageId(Uuid::uuid4()->toString()); $message->setHeader('attempts', 0); if (null !== $this->timeToLive && null === $message->getTimeToLive()) { diff --git a/pkg/redis/composer.json b/pkg/redis/composer.json index dfbccfd04..73b2236a3 100644 --- a/pkg/redis/composer.json +++ b/pkg/redis/composer.json @@ -8,7 +8,8 @@ "require": { "php": "^7.1.3", "queue-interop/queue-interop": "0.7.x-dev", - "enqueue/dsn": "0.9.x-dev" + "enqueue/dsn": "0.9.x-dev", + "ramsey/uuid": "^3" }, "require-dev": { "phpunit/phpunit": "~5.4.0",