Skip to content

Commit

Permalink
Merge pull request #26 from ramasofficial/add-extensions-support
Browse files Browse the repository at this point in the history
Add extensions support
  • Loading branch information
makasim authored Jul 26, 2022
2 parents f037278 + 67f22aa commit d8e30fb
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
11 changes: 8 additions & 3 deletions src/EnqueueServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

class EnqueueServiceProvider extends ServiceProvider
{
/**
* @var array
*/
protected $extensions = [];

public function boot()
{
$this->bootInteropQueueDriver();
Expand All @@ -36,7 +41,7 @@ private function registerClient()
$this->app->singleton(SimpleClient::class, function() {
/** @var \Illuminate\Config\Repository $config */
$config = $this->app['config'];

return new SimpleClient($config->get('enqueue.client'));
});

Expand Down Expand Up @@ -64,14 +69,14 @@ private function bootInteropQueueDriver()
});

$this->app->extend('queue.worker', function ($worker, $app) {
return new Worker(
return (new Worker(
$app['queue'],
$app['events'],
$app[ExceptionHandler::class],
function () use ($app) {
return $app->isDownForMaintenance();
}
);
))->setExtensions($this->extensions);
});
}
}
27 changes: 23 additions & 4 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class Worker extends \Illuminate\Queue\Worker implements

protected $job;

protected $extensions = [];

public function daemon($connectionName, $queueNames, WorkerOptions $options)
{
$this->connectionName = $connectionName;
Expand All @@ -56,7 +58,9 @@ public function daemon($connectionName, $queueNames, WorkerOptions $options)
}

$context = $this->queue->getQueueInteropContext();
$queueConsumer = new QueueConsumer($context, new ChainExtension([$this]));
$queueConsumer = new QueueConsumer($context, new ChainExtension(
$this->getAllExtensions([$this])
));
foreach (explode(',', $queueNames) as $queueName) {
$queueConsumer->bindCallback($queueName, function() {
$this->runJob($this->job, $this->connectionName, $this->options);
Expand Down Expand Up @@ -85,10 +89,10 @@ public function runNextJob($connectionName, $queueNames, WorkerOptions $options)

$context = $this->queue->getQueueInteropContext();

$queueConsumer = new QueueConsumer($context, new ChainExtension([
$queueConsumer = new QueueConsumer($context, new ChainExtension($this->getAllExtensions([
$this,
new LimitConsumedMessagesExtension(1),
]));
])));

foreach (explode(',', $queueNames) as $queueName) {
$queueConsumer->bindCallback($queueName, function() {
Expand Down Expand Up @@ -163,5 +167,20 @@ public function stop($status = 0)

parent::stop($status);
}
}

public function setExtensions(array $extensions): self
{
$this->extensions = $extensions;

return $this;
}

protected function getAllExtensions(array $array): array
{
foreach ($this->extensions as $extension) {
$array[] = $extension;
}

return $array;
}
}

0 comments on commit d8e30fb

Please sign in to comment.