This package contains the PHP class
mle86\WQ\WorkServerAdapter\RedisWorkServer
.
It supplements the
mle86/wq package
by implementing its WorkServerAdapter
interface.
It connects to a Redis server using the phpredis extension.
This is
version 1.0.2
of mle86/wq-redis
.
It was developed for
version 1.0.0
of mle86/wq
and should be compatible
with all of its future 1.x versions as well.
$ sudo apt install php-redis # to install the phpredis extension
$ composer require mle86/wq-redis
It depends on PHP 7.1, mle86/wq, and the phpredis extension.
(class mle86\WQ\WorkServerAdapter\RedisWorkServer implements WorkServerAdapter
)
It connects to a Redis server.
Because Redis does not have delayed entries, reserved entries, or buried entries, this class uses several custom workarounds to emulate those features.
For every $workQueue
used,
this class will create multiple Redis keys:
_wq.$workQueue
(ready jobs – List)_wq_delay.$workQueue
(delayed jobs – Ordered Set)_wq_buried.$workQueue
(buried jobs – List)
The delaying mechanism was inspired by this StackOverflow response.
public function __construct (\Redis $serverConnection)
Takes an already-configuredRedis
instance to work with. Does not attempt to establish a connection itself – use theconnect()
factory method for that instead or do it withRedis::connect()
prior to using this constructor.public function connect ($host = "localhost", $port = 6379, $timeout = 0.0, $retry_interval = 0)
Factory method. This will create a newRedis
instance by itself.
SeeRedis::connect()
for the parameter descriptions.
Interface methods
which are documented in the WorkServerAdapter
interface:
public function storeJob (string $workQueue, Job $job, int $delay = 0)
public function getNextQueueEntry ($workQueue, int $timeout = DEFAULT_TIMEOUT) : ?QueueEntry
public function buryEntry (QueueEntry $entry)
public function requeueEntry (QueueEntry $entry, int $delay, string $workQueue = null)
public function deleteEntry (QueueEntry $entry)
<?php
use mle86\WQ\WorkServerAdapter\RedisWorkServer;
use mle86\WQ\WorkProcessor;
use mle86\WQ\Job\Job;
$processor = new WorkProcessor( new RedisWorkServer("localhost") );
while (true) {
$processor->processNextJob("webhook", function(Job $job) {
$job->...;
});
}
This executes all jobs available in the local Redis server's “webhook
” queue, forever.
It will however abort if one of the jobs throws an exception –
you might want to add a logging try-catch block around the processNextJob()
call
as shown in WQ's “Quick Start” example.