A Disque client for PHP 5.5.
composer require 0x20h/phloppy:~0.0
Disque's API is implemented in different \Phloppy\Client
implementations that
reflect their specific use case. All clients get injected a StreamInterface
that holds the link to the connected node.
The first thing to do is to connect to a Disque node. For that, use one of the
StreamInterface
implementations.
$cache = new FileCache('/tmp/nodes');
$stream = new CachedPool(['tcp://127.0.0.1:7711', 'tcp://127.0.0.1:7712'], $cache);
$stream->connect();
Then, inject the $stream
into a client, i.e. a Consumer
.
$consumer = new Consumer($stream);
while (true) {
$job = $consumer->getJob('my_queue');
// process $job
}
Clients are separated into Producer
, Consumer
, Node
, Queue
and Cluster
.
Every client contains methods related to their specific use-case.
Holds all API commands related to submitting jobs to a Disque cluster.
$producer = new Producer($stream);
$job = $producer->addJob('test', Job::create(['body' => 42]));
Commands:
addJob(queueName, job, [maxlen = 0], [async = false])
setReplicationTimeout(msecs)
setReplicationFactor(n)
Implements all commands related to getting jobs from a Disque cluster.
$consumer = new Consumer($stream);
$job = $consumer->getJob('test');
// do some work
$consumer->ack($job);
Commands:
getJob(queueNames)
getJobs(queueNames, numberOfJobs)
ack(job)
fastAck(job)
show(jobid)
$queue = new Queue($stream);
// print out the current queue len on the connected node
echo $queue->len('test');
// get the latest job out of 'test' without removing it
echo $queue->peek('test');
Commands:
len(queueName)
peek(queueName)
scan(count,min,max,rate)
enqueue(jobIds)
dequeue(jobIds)
Contains commands related to a single Disque instance.
$consumer = new Node($stream);
$nodes = $consumer->hello();
Commands:
hello()
info()
ping()
auth(password)
jscan(count, queues[], states[], format)
$cluster = new Cluster($stream);
$cluster->meet($stream->getStreamUrls());
Commands:
meet($urls)
Connect to a single node. If the connection fails, a ConnectException
thrown.
If the node fails, a StreamException is thrown.
$stream = new DefaultStream('tcp://127.0.0.1:7711');
Connect randomly to on of the provided nodes. If during operation one of the nodes dies or doesn't respond anymore
the Pool
automatically reconnects to one of the other nodes. If no other node is left, a ConnectException
is thrown.
$stream = new Pool(['tcp://127.0.0.1:7711', 'tcp://127.0.0.1:7712']);
Same behavior as the Pool
implementation, but you can provide a CacheInterface
implemention
in order to cache all existing cluster nodes. When connecting, a random node from the cached
cluster nodes is chosen.
$cache = new FileCache('/tmp/nodes');
$stream = new CachedPool(['tcp://127.0.0.1:7711'], $cache);
The MIT License (MIT).