As of version 0.4.0 the worker has been rewritten to a flexible event driven approach. The processing logic is now a very minimalistic method. In pseudocode it looks like this;
processQueue
trigger event 'bootstrap'
while event says continue processing
trigger event 'process.queue'
trigger event 'process.job'
trigger event 'finish'
trigger event 'state'
To get some useful results it is required to register so called 'worker strategies' to the worker. SlmQueue makes this trivial via configuration.
Worker strategies are aggregate listeners which are created via a plugin manager.
At least one worker strategy listening to the bootstrap event must be registered to the worker. The Worker Factory will
throw an exception if its not. SlmQueue attaches the provided AttachQueueListenersStrategy
to do just that.
It is worth noting that events will be dispatched from the worker (obviously) but can also be dispatch from within worker strategies.
The plugin manager ensures they extend SlmQueue\Listener\Strategy\AbstractStrategy
and each worker strategy therefore
gains the following capabilities;
Configuration options are passed by the plugin manager to the constructor of an worker strategy. Setter methods will be called for each option. If a setter does not exist an exception will be thrown.
'SlmQueue\Strategy\MaxRunsStrategy' => ['max_runs' => 10];
Such a config will result in an MaxRunsStrategy instance of which the setMaxRuns method is called with '10'.
The optional 'priority' option is used when the aggregates listeners are are registered with event manager and is thereafter removed from the passed options. This means a Worker Strategy cannot have this option.
Worker strategies may inform the worker to stop processing the queue. Or more concrete; invalidate the condition of the while loop.
public function onSomeListener(WorkerEvent $event)
{
return ExitWorkerLoopResult::withReason('an exit reason');
...
}
While processing a queue it might be required to execute some setup- or teardown logic. A worker strategy may listen to
the bootstrap
and/or finish
event to do just this.
/**
* @param EventManagerInterface $events
*/
public function attach(EventManagerInterface $events)
{
$this->listeners[] = $events->attach(
WorkerEventInterface::EVENT_BOOTSTRAP,
[$this, 'onBootstrap']
);
$this->listeners[] = $events->attach(
WorkerEventInterface::EVENT_FINISH,
[$this, 'onFinish']
);
}
/**
* @param BootstrapEvent $e
*/
public function onBootstrap(BootstrapEvent $e)
{
// setup code
}
/**
* @param FinishEvent $e
*/
public function onFinish(FinishEvent $e)
{
// teardown code
}
For some types of jobs it might be required to do something before or after the execution of an individual job.
This can be done by listening to the process
event at different priorities.
/**
* @param EventManagerInterface $events
*/
public function attach(EventManagerInterface $events)
{
$this->listeners[] = $events->attach(
WorkerEventInterface::EVENT_PROCESS_JOB,
[$this, 'onPreProcess'],
100
);
$this->listeners[] = $events->attach(
WorkerEventInterface::EVENT_PROCESS_JOB,
[$this, 'onPostProcess'],
-100
);
}
/**
* @param ProcessJobEvent $e
*/
public function onPreProcess(ProcessJobEvent $e)
{
// pre job execution code
}
/**
* @param ProcessJobEvent $e
*/
public function onPostProcess(ProcessJobEvent $e)
{
// post job execution code
}
A worker strategy may report a state when th eworker exits. The strategy need to listen to 'WorkerEventInterface::EVENT_PROCESS_STATE' event. The AbstractStrategy implements a onReportQueueState
method that takes the $this->state
and returns it as appropiate.
From the MaxRunStrategy;
public function attach(EventManagerInterface $events, $priority = 1)
{
$this->listeners[] = $events->attach(
WorkerEventInterface::EVENT_PROCESS_QUEUE,
[$this, 'onStopConditionCheck'],
-1000
);
$this->listeners[] = $events->attach(
WorkerEventInterface::EVENT_PROCESS_STATE,
[$this, 'onReportQueueState'],
$priority
);
}
public function onStopConditionCheck(WorkerEvent $event)
{
$this->runCount++;
$this->state = sprintf('%s jobs processed', $this->runCount);
}
A worker strategy may ask the worker to dispatch events.
From the ProcessQueueStrategy
public function onJobPop(ProcessQueueEvent $processQueueEvent)
{
/** @var AbstractWorker $worker */
$worker = $processQueueEvent->getTarget();
$queue = $processQueueEvent->getQueue();
$options = $processQueueEvent->getOptions();
$eventManager = $worker->getEventManager();
$job = $queue->pop($options);
// The queue may return null, for instance if a timeout was set
if (!$job instanceof JobInterface) {
/** @var ResponseCollection $results */
$results = $eventManager->triggerEventUntil(
function ($response) {
return $response instanceof ExitWorkerLoopResult;
},
new ProcessIdleEvent($worker, $queue)
);
$processQueueEvent->stopPropagation();
if ($results->stopped()) {
return $results->last();
}
return;
}
$eventManager->triggerEvent(new ProcessJobEvent($job, $worker, $queue));
}
Worker strategies are regular ZF2 services that are instanciated via a plugin manager. If a worker strategy has dependancies on other services it should be created it via service factory.
The plugin manager is configured to not share services.
Events the worker and worker strategies may dispatch;
WorkerEventInterface::EVENT_BOOTSTRAP
just before loop is enteredWorkerEventInterface::EVENT_FINISH
just after the loop has exitedWorkerEventInterface::EVENT_PROCESS_QUEUE
fetch job(s)WorkerEventInterface::EVENT_PROCESS_JOB
processes job(s)WorkerEventInterface::EVENT_PROCESS_IDLE
when the queue is emptyWorkerEventInterface::EVENT_PROCESS_STATE
collects 'states' from strategies.
A listener waiting for above events will be passed a an event class extending WorkerEvent
. Depending on the type it might contain additional methods, such as getJob or getQueue.
$em->attach(WorkerEventInterface::EVENT_PROCESS_JOB, function(ProcessJobEvent $e) {
$queue = $e->getQueue();
$job = $e->getJob();
});
In above example, $em
refers to the event manager inside the worker object: $em = $worker->getEventManager();
.
When a job is processed, the job or worker returns a status code. You can use a listener to act upon this status, for example to log any failed jobs:
$logger = $sm->get('logger');
$em->attach(WorkerEventInterface::EVENT_PROCESS_JOB, function(ProcessJobEvent $e) use ($logger) {
$result = $e->getResult();
if (ProcessJobEvent::JOB_STATUS_FAILURE === $result) {
$job = $e->getJob();
$logger->warn(sprintf(
'Job #%s (%s) failed executing', $job->getId(), get_class($job)
));
}
}, -1000);
The purpose of this strategy is to register additional strategies that are specific to the queue that is being processed.
After registering any additional worker strategies it will unregister itself as a listener. Finally it halts the event
propagation and re-triggers the bootstrap
event.
A new cycle of bootstraping will occure but now with additional queue specific strategies.
listens to:
bootstrap
event at priority PHP_MAX_INT
triggers:
bootstrap
throws:
- RunTimeException if the
process.queue
event isn't listened to by any registered strategy.
This strategy is enabled by default for all queue's.
This strategy is able to 'watch' files by creating a hash of their contents. If it detects a change it will request to stop processing the queue. This is useful if you have something like supervisor automatically restarting the worker process.
The strategy builds a list of files it needs to watch via a preg_match on the filenames within the application.
listens to:
process.idle
event at priority 1process.job
event at priority 1000process.state
event at priority 1
options:
- pattern defaults to
'/^\.\/(config|module).*\.(php|phtml)$/'
- idle_throttle_time defaults to 300 seconds
This strategy is not enabled by default. It can be slow and is recommended for development only. In production you may
watch a single file. It will run the check before each job and while idling after idle_throttle_time
seconds
have passed.
The InterruptStrategy is able to catch a stop condition under Linux-like systems (as well as OS X). If a worker is started from the command line interface (CLI), it is possible to send a SIGTERM or SiGINT call to the worker. SlmQueue is smart enough not to quit the script directly, but let the job finish its work first and then break out of the loop. On Windows systems this strategy does nothing.
listens to:
process.idle
event at priority 1process.queue
event at priority -1000process.state
event at priority 1
This strategy is enabled by default for all queue's.
The MaxMemoryStrategy will measure the amount of memory allocated to PHP after each processed job. It will request to exit when a threshold is exceeded.
Note that an individual job may exceed this threshold during it's live time. But if you have a memory leak this strategy can make sure the script aborts eventually.
listens to:
process.idle
event at priority 1process.queue
event at priority -1000process.state
event at priority 1
options:
- max_memory defaults to 100*1024*1024
This strategy is enabled by default for all queue's.
The MaxRunsStrategy will request to exit after a set number of jobs have been processed.
listens to:
process.idle
event at priority 1process.job
event at priority -1000process.state
event at priority 1
options:
- max_runs defaults to 100000
This strategy is enabled by default for all queue's.
The WorkerLifetimeStrategy
will request to exit the worker if a given lifetime was reached or exceeded.
The configured lifetime is NOT a hard-cap for the actual runtime of the worker, because no jobs will be killed during their execution. It is more like a soft-cap because the check for exiting is only between jobs (while idling and at the start of a new job).
So if a worker with a short lifetime (e. g. 1 hour) starts a long running job (e. g. 2 hours) it will exit after the execution of the job.
listens to:
bootstrap
event at priority 1: sets the start timeprocess.queue
event at priority -1000: exits if the lifetime was exceededprocess.idle
event at priority -1000: exits if the lifetime was exceededprocess.state
event at priority 1: returns the current state of the strategy
options:
lifetime
: the softcap of the worker lifetime in seconds, defaults to 3600 seconds (1 hour)
Responsible for quering the queue for jobs and executing them.
listens to:
process.queue
event at priority 1process.job
event at priority 1
triggers:
process.job
for each job pop'ed from the queueprocess.idle
if the queue returns null (it might be empty or timed out)
The MaxPollingFrequencyStrategy ensure the polling frequency don't exceed a defined value. This can be useful in the case where you are using a system like SQS which makes you pay the service per request.
listens to:
process.queue
event at priority 1000
options:
- max_frequency
This strategy is not enabled by default.
Frequency | x / sec | x / min | x / hour | x / day | x / month |
---|---|---|---|---|---|
0.1 | 0.1 | 6 | 360 | 8640 | 259200 |
0.2 | 0.2 | 12 | 720 | 17280 | 518400 |
0.5 | 0.5 | 30 | 1800 | 43200 | 1296000 |
1 | 1 | 60 | 3600 | 86400 | 2592000 |
2 | 2 | 120 | 7200 | 172800 | 5184000 |
5 | 5 | 300 | 18000 | 432000 | 12960000 |
10 | 10 | 600 | 36000 | 864000 | 25920000 |
Frequency | Delay |
---|---|
0.000278 | 1 hour |
0.0167 | 1 min |
0.1 | 10 s |
0.2 | 5 s |
0.5 | 2 s |
1 | 1 s |
2 | 500 ms |
5 | 200 ms |
10 | 100 ms |
In the SlmQueue config, find the part named worker_strategies
and add the
following line:
'SlmQueue\Strategy\MaxPollingFrequencyStrategy' => ['max_frequency' => 1]
Replace the max_frequency
value helping you with the tables above.
Instead of direct access to the worker's event manager, the shared manager is available to register events too:
<?php
namespace MyModule;
use SlmQueue\Worker\WorkerEvent;
use Laminas\Mvc\MvcEvent;
class Module
{
public function onBootstrap(MvcEvent $e)
{
$em = $e->getApplication()->getEventManager();
$sharedEm = $em->getSharedManager();
$sharedEm->attach('SlmQueue\Worker\WorkerInterface', WorkerEvent::EVENT_PROCESS_JOB, function() {
// some thing just before a job starts.
}, 1000);
}
}
Note: since v1.0.1 we have decoupled from laminas/laminas-mvc and as such the shared event manager isn't available in the service container. If you are not using laminas-mvc you should not use the shared event manager.
Note: we will probably move away from the shared event manager for a next major release. We recommend that if you need to subscribe to events to use the worker's event manager SlmQueue\Worker\AbstractWorker::getEventManager()
.
<?php
namespace MyModule;
use SlmQueue\Worker\WorkerEvent;
use Laminas\Mvc\MvcEvent;
class Module
{
public function onBootstrap(MvcEvent $e)
{
$sm = $e->getApplication()->getServiceLocator();
$worker = $sm->get('\SlmQueueDoctrine\Worker\DoctrineWorker');
$em = $worker->getEventManager();
$em->attach(WorkerEvent::EVENT_PROCESS_JOB, function() {
// some thing just before a job starts.
}, 1000);
}
}
A good example is i18n: a job is given a locale if the job performs localized actions. This locale is set to the translator just before processing starts. The original locale is reverted when the job has finished processing.
In this case, all jobs which require a locale set are implementing a LocaleAwareInterface
:
<?php
namespace MyModule\Job;
interface LocaleAwareInterface
{
/**
* @param string $locale
*/
public function setLocale($locale);
/**
* @return string
*/
public function getLocale();
}
An worker strategy will listen for two events to set and revert the locale:
<?php
namespace MyModule\Strategy;
use MyModule\Job\LocaleAwareInterface;
use SlmQueue\Listener\Strategy\AbstractStrategy;
use SlmQueue\Worker\WorkerEvent;
use Laminas\EventManager\EventManagerInterface;
use Laminas\I18n\Translator\Translator;
class JobTranslatorStrategy extends AbstractStrategy
{
/**
* @var Stores original locale while processing a Job
*/
protected $locale;
/**
* @var Instance of Translator
*/
protected $translator;
/**
* @param Translator $translator
*/
public function __construct(Translator $translator)
{
$this->translator = $translator;
}
/**
* {@inheritDoc}
*/
public function attach(EventManagerInterface $events)
{
$this->listeners[] = $events->attach(WorkerEventInterface::EVENT_PROCESS_JOB, [$this, 'onPreJobProc'], 1000);
$this->listeners[] = $events->attach(WorkerEventInterface::EVENT_PROCESS_JOB, [$this, 'onPostJobProc'], -1000);
}
public function onPreJobProcessing(ProcessJobEvent $e)
{
$job = $e->getJob();
if (!$job instanceof LocaleAwareInterface) {
return;
}
$this->locale = $this->translator->getLocale();
$this->translator->setLocale($job->getLocale());
}
public function onPostJobProcessing(ProcessJobEvent $e)
{
$job = $e->getJob();
if (!$job instanceof LocaleAwareInterface) {
return;
}
$this->translator->setLocale($this->locale);
}
}
Since this worker strategy has a dependency that needs to be injected we should create a factory for it.
<?php
namespace MyModule\Strategy\Factory;
use MyModule\Strategy\JobTranslatorStrategy;
use Laminas\ServiceManager\Factory\FactoryInterface;
use Laminas\ServiceManager\ServiceLocatorInterface;
class JobTranslatorStrategyFactory implements FactoryInterface
{
public function __invoke(ContainerInterface $container, $requestedName, array $options = null)
{
$sm = $container->getServiceLocator();
/** @var $sm \Laminas\Mvc\I18n\Translator */
$translator = $sm->get('MvcTranslator');
$strategy = new JobTranslatorStrategy($translator);
return $strategy;
}
}
Finally add two configuration settings;
- Register the factory to the plugin manager to the Strategy Manager.
- Add the strategy by name to the worker strategies. Note we can do this for all queue's or for specific ones.
<?php
return [
'slm_queue' => [
/**
* Worker Strategies
*/
'worker_strategies' => [
'default' => [ // per worker
// add it here to enable the
],
'queues' => [ // per queue
'my-queue' => [
'MyModule\Strategy\JobTranslatorStrategy',
]
],
],
/**
* Strategy manager
*/
'strategy_manager' => [
'factories' => [
'MyModule\Strategy\JobTranslatorStrategy' => 'MyModule\Strategy\Factory\JobTranslatorStrategyFactory',
]
],
]
];
Previous page: Workers Next page: Worker Management