diff --git a/pkg/redis-tools/DelayStrategy.php b/pkg/redis-tools/DelayStrategy.php new file mode 100644 index 000000000..d4212ecce --- /dev/null +++ b/pkg/redis-tools/DelayStrategy.php @@ -0,0 +1,26 @@ +delayStrategy = $delayStrategy; + + return $this; + } +} diff --git a/pkg/redis-tools/RedisZSetDelayConsumer.php b/pkg/redis-tools/RedisZSetDelayConsumer.php new file mode 100644 index 000000000..b9097caa2 --- /dev/null +++ b/pkg/redis-tools/RedisZSetDelayConsumer.php @@ -0,0 +1,136 @@ +context = $context; + $this->queue = $queue; + } + + /** + * {@inheritdoc} + * + * @return RedisDestination + */ + public function getQueue() + { + return $this->queue; + } + + /** + * {@inheritdoc} + * + * @return RedisMessage|null + */ + public function receive($timeout = 0) + { + return $this->receiveNoWait(); + } + + /** + * {@inheritdoc} + * + * @return RedisMessage|null + */ + public function receiveNoWait() + { + while (false !== ($timestamp = $this->nextDelayedTimestamp())) { + $this->enqueueDelayedMessagesForTimestamp($timestamp); + } + } + + /** + * {@inheritdoc} + * + * @param RedisMessage $message + */ + public function acknowledge(PsrMessage $message) + { + // do nothing. redis transport always works in auto ack mode + } + + /** + * {@inheritdoc} + * + * @param RedisMessage $message + */ + public function reject(PsrMessage $message, $requeue = false) + { + InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class); + + // do nothing on reject. redis transport always works in auto ack mode + + if ($requeue) { + $this->context->createProducer()->send($this->queue, $message); + } + } + + /** + * @return Redis + */ + private function getRedis() + { + return $this->context->getRedis(); + } + + //TODO: refactor into a php generator + private function nextDelayedTimestamp() + { + $at = time(); + + //TODO:check zrange by score definition + $items = $this->getRedis()->zrangebyscore('enqueue:'.$this->getQueue()->getTopicName().':delayed', '-inf', $at, 'LIMIT', 0, 1); + + if (!empty($items)) { + return $items[0]; + } + + return false; + } + + private function enqueueDelayedMessagesForTimestamp($timestamp) + { + $message = null; + while ($message = $this->nextMessageForTimestamp($timestamp)) { + $this->context->createProducer()->send($this->queue, $message); + } + } + + private function nextMessageForTimestamp($timestamp) + { + $queue = 'enqueue:'.$this->getQueue()->getTopicName().':delayed:'.$timestamp; + if ($message = $this->getRedis()->rpop($queue)) { + if (0 == $this->getRedis()->llen($queue)) { + $this->getRedis()->del($queue); + $this->getRedis()->zrem('enqueue:'.$this->getQueue()->getTopicName().':delayed', $timestamp); + } + + return RedisMessage::jsonUnserialize($message); + } + } +} diff --git a/pkg/redis-tools/RedisZSetDelayStrategy.php b/pkg/redis-tools/RedisZSetDelayStrategy.php new file mode 100644 index 000000000..6abc5c084 --- /dev/null +++ b/pkg/redis-tools/RedisZSetDelayStrategy.php @@ -0,0 +1,43 @@ +createMessage($message->getBody(), $message->getProperties(), $message->getHeaders()); + $delayMessage->setProperty('x-delay', (int) $delayMsec); + + $targetTimestamp = time() + $delayMsec / 1000; + + $context->getRedis()->zadd('enqueue:'.$dest->getTopicName().':delayed', $targetTimestamp, $targetTimestamp); + + $delayTopic = $context->createTopic('enqueue:'.$dest->getTopicName().':delayed:'.$targetTimestamp); + + $producer = $context->createProducer(); + + if ($producer instanceof DelayStrategyAware) { + $producer->setDelayStrategy(null); + } + + $producer->send($delayTopic, $delayMessage); + } + + /** + * {@inheritdoc} + */ + public function processDelayedMessage(RedisContext $context, RedisDestination $dest) + { + $delayConsumer = new RedisZSetDelayConsumer($context, $dest); + + $delayConsumer->receive(); + } +} diff --git a/pkg/redis-tools/composer.json b/pkg/redis-tools/composer.json new file mode 100644 index 000000000..9599b6caa --- /dev/null +++ b/pkg/redis-tools/composer.json @@ -0,0 +1,36 @@ +{ + "name": "enqueue/redis-tools", + "type": "library", + "description": "Message Queue Redis Tools", + "keywords": ["messaging", "queue", "redis"], + "homepage": "https://enqueue.forma-pro.com/", + "license": "MIT", + "require": { + "php": ">=5.6", + "queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1" + }, + "require-dev": { + "phpunit/phpunit": "~5.4.0", + "enqueue/test": "^0.8@dev", + "enqueue/null": "^0.8@dev" + }, + "support": { + "email": "opensource@forma-pro.com", + "issues": "https://github.com/php-enqueue/enqueue-dev/issues", + "forum": "https://gitter.im/php-enqueue/Lobby", + "source": "https://github.com/php-enqueue/enqueue-dev", + "docs": "https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md" + }, + "autoload": { + "psr-4": { "Enqueue\\RedisTools\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.8.x-dev" + } + } +} diff --git a/pkg/redis/Redis.php b/pkg/redis/Redis.php index 796081775..cdf2e060e 100644 --- a/pkg/redis/Redis.php +++ b/pkg/redis/Redis.php @@ -35,4 +35,16 @@ public function disconnect(); * @param string $key */ public function del($key); + + public function zrangebyscore($key, $min, $max, $offset, $limit, $options = []); + + public function zadd($key, $score, $member); + + public function rpush($key, $value); + + public function lpop($key); + + public function llen($key); + + public function zrem($key, $member); } diff --git a/pkg/redis/RedisConsumer.php b/pkg/redis/RedisConsumer.php index 6c57f798c..e5796b656 100644 --- a/pkg/redis/RedisConsumer.php +++ b/pkg/redis/RedisConsumer.php @@ -2,12 +2,15 @@ namespace Enqueue\Redis; +use Enqueue\RedisTools\DelayStrategyAware; +use Enqueue\RedisTools\DelayStrategyAwareTrait; use Interop\Queue\InvalidMessageException; use Interop\Queue\PsrConsumer; use Interop\Queue\PsrMessage; -class RedisConsumer implements PsrConsumer +class RedisConsumer implements PsrConsumer, DelayStrategyAware { + use DelayStrategyAwareTrait; /** * @var RedisDestination */ @@ -45,6 +48,10 @@ public function getQueue() */ public function receive($timeout = 0) { + if (null != $this->delayStrategy) { + $this->delayStrategy->processDelayedMessage($this->context, $this->queue); + } + $timeout = (int) ($timeout / 1000); if (empty($timeout)) { // Caused by @@ -66,6 +73,10 @@ public function receive($timeout = 0) */ public function receiveNoWait() { + if (null != $this->delayStrategy) { + $this->delayStrategy->processDelayedMessage($this->context, $this->queue); + } + if ($message = $this->getRedis()->rpop($this->queue->getName())) { return RedisMessage::jsonUnserialize($message); } diff --git a/pkg/redis/RedisContext.php b/pkg/redis/RedisContext.php index b87cc5e5f..26a50c3a7 100644 --- a/pkg/redis/RedisContext.php +++ b/pkg/redis/RedisContext.php @@ -2,14 +2,19 @@ namespace Enqueue\Redis; +use Enqueue\RedisTools\DelayStrategy; +use Enqueue\RedisTools\DelayStrategyAware; +use Enqueue\RedisTools\DelayStrategyAwareTrait; use Interop\Queue\InvalidDestinationException; use Interop\Queue\PsrContext; use Interop\Queue\PsrDestination; use Interop\Queue\PsrQueue; use Interop\Queue\PsrTopic; -class RedisContext implements PsrContext +class RedisContext implements PsrContext, DelayStrategyAware { + use DelayStrategyAwareTrait; + /** * @var Redis */ @@ -20,6 +25,11 @@ class RedisContext implements PsrContext */ private $redisFactory; + /** + * @var DelayStrategy[] + */ + private $delayStrategies = []; + /** * Callable must return instance of Redis once called. * @@ -29,7 +39,7 @@ public function __construct($redis) { if ($redis instanceof Redis) { $this->redis = $redis; - } elseif (is_callable($redis)) { + } elseif (\is_callable($redis)) { $this->redisFactory = $redis; } else { throw new \InvalidArgumentException(sprintf( @@ -105,7 +115,10 @@ public function createTemporaryQueue() */ public function createProducer() { - return new RedisProducer($this->getRedis()); + $producer = new RedisProducer($this); + $producer->setDelayStrategy($this->delayStrategy); + + return $producer; } /** @@ -119,7 +132,10 @@ public function createConsumer(PsrDestination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class); - return new RedisConsumer($this, $destination); + $consumer = new RedisConsumer($this, $destination); + $consumer->setDelayStrategy($this->delayStrategy); + + return $consumer; } public function close() @@ -133,12 +149,12 @@ public function close() public function getRedis() { if (false == $this->redis) { - $redis = call_user_func($this->redisFactory); + $redis = \call_user_func($this->redisFactory); if (false == $redis instanceof Redis) { throw new \LogicException(sprintf( 'The factory must return instance of %s. It returned %s', Redis::class, - is_object($redis) ? get_class($redis) : gettype($redis) + \is_object($redis) ? \get_class($redis) : \gettype($redis) )); } diff --git a/pkg/redis/RedisProducer.php b/pkg/redis/RedisProducer.php index e34506b3a..4a4be4b7c 100644 --- a/pkg/redis/RedisProducer.php +++ b/pkg/redis/RedisProducer.php @@ -2,25 +2,40 @@ namespace Enqueue\Redis; +use Enqueue\RedisTools\DelayStrategyAware; +use Enqueue\RedisTools\DelayStrategyAwareTrait; +use Interop\Queue\DeliveryDelayNotSupportedException; use Interop\Queue\InvalidDestinationException; use Interop\Queue\InvalidMessageException; use Interop\Queue\PsrDestination; use Interop\Queue\PsrMessage; use Interop\Queue\PsrProducer; -class RedisProducer implements PsrProducer +class RedisProducer implements PsrProducer, DelayStrategyAware { + use DelayStrategyAwareTrait; /** * @var Redis */ private $redis; /** - * @param Redis $redis + * @var RedisContext */ - public function __construct(Redis $redis) + private $context; + + /** + * @var int + */ + private $deliveryDelay; + + /** + * @param RedisContext $redisContext + */ + public function __construct(RedisContext $redisContext) { - $this->redis = $redis; + $this->redis = $redisContext->getRedis(); + $this->context = $redisContext; } /** @@ -34,7 +49,11 @@ public function send(PsrDestination $destination, PsrMessage $message) InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class); InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class); - $this->redis->lpush($destination->getName(), json_encode($message)); + if ($this->deliveryDelay) { + $this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay); + } else { + $this->redis->lpush($destination->getName(), json_encode($message)); + } } /** @@ -46,7 +65,13 @@ public function setDeliveryDelay($deliveryDelay) return; } - throw new \LogicException('Not implemented'); + if (null === $this->delayStrategy) { + throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + } + + $this->deliveryDelay = $deliveryDelay; + + return $this; } /** @@ -54,7 +79,7 @@ public function setDeliveryDelay($deliveryDelay) */ public function getDeliveryDelay() { - return null; + return $this->deliveryDelay; } /**