Declaration: class mle86\WQ\WorkProcessor
Source file: src/WQ/WorkProcessor.php
This class implements a wrapper around
WorkServerAdapter::getNextQueueEntry()
called processNextJob()
that does not only execute the next job immediately
but will also try to re-queue it if it fails.
public function __construct (WorkServerAdapter $workServer, LoggerInterface $logger = null, array $options = [])
Instantiates a new WorkProcessor. This causes no side effects yet.$workServer
: The work server adapter to work with.$logger
: A PSR-3 logger. The WorkProcessor will report job success status here.$options
: Options to set, overriding the default options. Works the same as asetOptions()
call right after instantiation.
public function processNextJob ($workQueue, callable $callback, int $timeout = WorkServerAdapter::DEFAULT_TIMEOUT): void
Executes the next job in the Work Queue by passing it to the callback function.
If that results in a\RuntimeException
or theJobResult::FAILED
return value, the method will try to re-queue the job and re-throw the exception.
If the execution results in any other\Throwable
or theJobResult::ABORT
return value, no re-queueing will be attempted; the job will be buried immediately.
If the next job in the Work Queue is expired, it will be silently deleted.
Will re-throw on any Exceptions/Throwables from the$callback
.
Throws aJobCallbackReturnValueException
in case of an unexpected callback return value (should be a JobResult constant ornull
orvoid
).$workQueue
: SeeWorkServerAdapter::getNextQueueEntry()
.$callback
: The handler callback to execute each Job.
Expected signature:function(Job, JobContext): ?int|void
. See the JobResult enum class for possible return values.$timeout
: SeeWorkServerAdapter::getNextQueueEntry()
.
public function setOption (int $option, $value): self
Sets one of the configuration options.$option
: One of theWP_
constants.$value
: The option's new value. The required type depends on the option.
public function setOptions (array $options): self
Sets one or more of the configuration options.
const WP_ENABLE_RETRY
If this option istrue
(default), failed jobs will be re-queued (if theirJob::jobCanRetry()
return value says so).
This option can be used to disable retries for all jobs if set tofalse
; jobs will then be handled as if theirJob::jobCanRetry
methods always returnedfalse
, i.e. they'll be buried or deleted (depending on theWS_ENABLE_BURY
option).const WP_ENABLE_BURY
If this option istrue
(default), permanently failed jobs will be buried; if it isfalse
, failed jobs will be deleted.const WP_DELETE
If this option is set toDELETE_FINISHED
(default), finished jobs will be deleted. Otherwise, its value is taken as a Work Queue name where all finished jobs will be moved to.
(It's possible to put the origin work queue name here, resulting in an infinite loop as all jobs in the queue will be executed over and over. Probably not what you want.)const WP_EXPIRED
If this option is set toDELETE_EXPIRED
(default), expired jobs will be deleted. If this option is set toBURY_EXPIRED
, expired jobs will be buried instead. Otherwise, its value is taken as a Work Queue name where all expired jobs will be moved to.
(It's possible to put the origin work queue name here, resulting in an infinite loop as soon as an expired job is encountered. Probably not what you want.)const WP_RETHROW_EXCEPTIONS
If this option istrue
(default), all exceptions thrown by handler callback will be re-thrown so that the caller receives them as well. If this option isfalse
,processNextJob()
will silently return instead.
Usually, tasks like logging or stats collection should be done in the custom worker script.
If multiple worker scripts share the same logging/stats code,
it can be put into these hook functions instead
by extending the WorkProcessor
class.
All of these hook methods are called by the processNextJob()
method.
In the provided base class, they are empty.
protected function onNoJobAvailable (array $workQueues): void
This method is called if there is currently no job to be executed in any of the polled work queues.protected function onJobAvailable (QueueEntry $qe): void
This method is called if there is a job ready to be executed, right before it is actually executed.protected function onSuccessfulJob (QueueEntry $qe): void
This method is called after a job has been successfully executed, right before it is deleted from the work queue.protected function onExpiredJob (QueueEntry $qe): void
This method is called if an expired job is encountered, right before it gets deleted.protected function onJobRequeue (QueueEntry $qe, \Throwable $e, int $delay): void
This method is called after a job that can be re-tried at least one more time has failed (thrown an exception), right beforeprocessNextJob()
re-queues it and re-throws the exception.protected function onFailedJob (QueueEntry $qe, \Throwable $e): void
This method is called after a job has permanently failed (thrown an exception and cannot be re-tried), right beforeprocessNextJob()
buries/deletes it and re-throws the exception.