Skip to content

[redis][poc] Add delay strategy #560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkg/redis-tools/DelayStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

namespace Enqueue\RedisTools;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we need redis tool for now, the code could be in Enqueue\Redis


use Enqueue\Redis\RedisContext;
use Enqueue\Redis\RedisDestination;
use Enqueue\Redis\RedisMessage;

interface DelayStrategy
{
/**
* @param RedisContext $context
* @param RedisDestination $dest
* @param RedisMessage $message
* @param int $delayMsec
*/
public function delayMessage(RedisContext $context, RedisDestination $dest, RedisMessage $message, $delayMsec);

/**
* Search for delayed message and move them on main queue.
*
* @param RedisContext $context
* @param RedisDestination $dest
*/
public function processDelayedMessage(RedisContext $context, RedisDestination $dest);
}
13 changes: 13 additions & 0 deletions pkg/redis-tools/DelayStrategyAware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace Enqueue\RedisTools;

interface DelayStrategyAware
{
/**
* @param DelayStrategy $delayStrategy
*
* @return self
*/
public function setDelayStrategy(DelayStrategy $delayStrategy = null);
}
23 changes: 23 additions & 0 deletions pkg/redis-tools/DelayStrategyAwareTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace Enqueue\RedisTools;

trait DelayStrategyAwareTrait
{
/**
* @var DelayStrategy
*/
protected $delayStrategy;

/**
* @param DelayStrategy|null $delayStrategy
*
* @return self
*/
public function setDelayStrategy(DelayStrategy $delayStrategy = null)
{
$this->delayStrategy = $delayStrategy;

return $this;
}
}
136 changes: 136 additions & 0 deletions pkg/redis-tools/RedisZSetDelayConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
<?php

namespace Enqueue\RedisTools;

use Enqueue\Redis\Redis;
use Enqueue\Redis\RedisContext;
use Enqueue\Redis\RedisDestination;
use Enqueue\Redis\RedisMessage;
use Interop\Queue\InvalidMessageException;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrMessage;

class RedisZSetDelayConsumer implements PsrConsumer
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code could be moved to RedisZSetDelayStrategy.

{
/**
* @var RedisDestination
*/
private $queue;

/**
* @var RedisContext
*/
private $context;

/**
* @param RedisContext $context
* @param RedisDestination $queue
*/
public function __construct(RedisContext $context, RedisDestination $queue)
{
$this->context = $context;
$this->queue = $queue;
}

/**
* {@inheritdoc}
*
* @return RedisDestination
*/
public function getQueue()
{
return $this->queue;
}

/**
* {@inheritdoc}
*
* @return RedisMessage|null
*/
public function receive($timeout = 0)
{
return $this->receiveNoWait();
}

/**
* {@inheritdoc}
*
* @return RedisMessage|null
*/
public function receiveNoWait()
{
while (false !== ($timestamp = $this->nextDelayedTimestamp())) {
$this->enqueueDelayedMessagesForTimestamp($timestamp);
}
}

/**
* {@inheritdoc}
*
* @param RedisMessage $message
*/
public function acknowledge(PsrMessage $message)
{
// do nothing. redis transport always works in auto ack mode
}

/**
* {@inheritdoc}
*
* @param RedisMessage $message
*/
public function reject(PsrMessage $message, $requeue = false)
{
InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class);

// do nothing on reject. redis transport always works in auto ack mode

if ($requeue) {
$this->context->createProducer()->send($this->queue, $message);
}
}

/**
* @return Redis
*/
private function getRedis()
{
return $this->context->getRedis();
}

//TODO: refactor into a php generator
private function nextDelayedTimestamp()
{
$at = time();

//TODO:check zrange by score definition
$items = $this->getRedis()->zrangebyscore('enqueue:'.$this->getQueue()->getTopicName().':delayed', '-inf', $at, 'LIMIT', 0, 1);

if (!empty($items)) {
return $items[0];
}

return false;
}

private function enqueueDelayedMessagesForTimestamp($timestamp)
{
$message = null;
while ($message = $this->nextMessageForTimestamp($timestamp)) {
$this->context->createProducer()->send($this->queue, $message);
}
}

private function nextMessageForTimestamp($timestamp)
{
$queue = 'enqueue:'.$this->getQueue()->getTopicName().':delayed:'.$timestamp;
if ($message = $this->getRedis()->rpop($queue)) {
if (0 == $this->getRedis()->llen($queue)) {
$this->getRedis()->del($queue);
$this->getRedis()->zrem('enqueue:'.$this->getQueue()->getTopicName().':delayed', $timestamp);
}

return RedisMessage::jsonUnserialize($message);
}
}
}
43 changes: 43 additions & 0 deletions pkg/redis-tools/RedisZSetDelayStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

namespace Enqueue\RedisTools;

use Enqueue\Redis\RedisContext;
use Enqueue\Redis\RedisDestination;
use Enqueue\Redis\RedisMessage;

class RedisZSetDelayStrategy implements DelayStrategy
{
/**
* {@inheritdoc}
*/
public function delayMessage(RedisContext $context, RedisDestination $dest, RedisMessage $message, $delayMsec)
{
$delayMessage = $context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders());
$delayMessage->setProperty('x-delay', (int) $delayMsec);

$targetTimestamp = time() + $delayMsec / 1000;

$context->getRedis()->zadd('enqueue:'.$dest->getTopicName().':delayed', $targetTimestamp, $targetTimestamp);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the third argument should be a serialized message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and if we do so we can stop on this. no need to send a message to enqueue:{topic}:delayed:{timestamp} collection.


$delayTopic = $context->createTopic('enqueue:'.$dest->getTopicName().':delayed:'.$targetTimestamp);

$producer = $context->createProducer();

if ($producer instanceof DelayStrategyAware) {
$producer->setDelayStrategy(null);
}

$producer->send($delayTopic, $delayMessage);
}

/**
* {@inheritdoc}
*/
public function processDelayedMessage(RedisContext $context, RedisDestination $dest)
{
$delayConsumer = new RedisZSetDelayConsumer($context, $dest);

$delayConsumer->receive();
}
}
36 changes: 36 additions & 0 deletions pkg/redis-tools/composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"name": "enqueue/redis-tools",
"type": "library",
"description": "Message Queue Redis Tools",
"keywords": ["messaging", "queue", "redis"],
"homepage": "https://enqueue.forma-pro.com/",
"license": "MIT",
"require": {
"php": ">=5.6",
"queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1"
},
"require-dev": {
"phpunit/phpunit": "~5.4.0",
"enqueue/test": "^0.8@dev",
"enqueue/null": "^0.8@dev"
},
"support": {
"email": "opensource@forma-pro.com",
"issues": "https://github.com/php-enqueue/enqueue-dev/issues",
"forum": "https://gitter.im/php-enqueue/Lobby",
"source": "https://github.com/php-enqueue/enqueue-dev",
"docs": "https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md"
},
"autoload": {
"psr-4": { "Enqueue\\RedisTools\\": "" },
"exclude-from-classmap": [
"/Tests/"
]
},
"minimum-stability": "dev",
"extra": {
"branch-alias": {
"dev-master": "0.8.x-dev"
}
}
}
12 changes: 12 additions & 0 deletions pkg/redis/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,16 @@ public function disconnect();
* @param string $key
*/
public function del($key);

public function zrangebyscore($key, $min, $max, $offset, $limit, $options = []);

public function zadd($key, $score, $member);

public function rpush($key, $value);

public function lpop($key);

public function llen($key);

public function zrem($key, $member);
}
13 changes: 12 additions & 1 deletion pkg/redis/RedisConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

namespace Enqueue\Redis;

use Enqueue\RedisTools\DelayStrategyAware;
use Enqueue\RedisTools\DelayStrategyAwareTrait;
use Interop\Queue\InvalidMessageException;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrMessage;

class RedisConsumer implements PsrConsumer
class RedisConsumer implements PsrConsumer, DelayStrategyAware
{
use DelayStrategyAwareTrait;
/**
* @var RedisDestination
*/
Expand Down Expand Up @@ -45,6 +48,10 @@ public function getQueue()
*/
public function receive($timeout = 0)
{
if (null != $this->delayStrategy) {
$this->delayStrategy->processDelayedMessage($this->context, $this->queue);
}

$timeout = (int) ($timeout / 1000);
if (empty($timeout)) {
// Caused by
Expand All @@ -66,6 +73,10 @@ public function receive($timeout = 0)
*/
public function receiveNoWait()
{
if (null != $this->delayStrategy) {
$this->delayStrategy->processDelayedMessage($this->context, $this->queue);
}

if ($message = $this->getRedis()->rpop($this->queue->getName())) {
return RedisMessage::jsonUnserialize($message);
}
Expand Down
Loading