From 9d348c5d57873bfcca86cff1987e22472d1f0e5b Mon Sep 17 00:00:00 2001 From: Taylor Otwell Date: Tue, 31 Jan 2017 11:41:26 -0600 Subject: [PATCH] Fix how reservation works when queue is paused. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Queue was moving jobs to reserve queue even when queue was pause was paused… Which can delay jobs unnecessarily. --- src/Illuminate/Queue/Worker.php | 55 +++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/src/Illuminate/Queue/Worker.php b/src/Illuminate/Queue/Worker.php index 797e72fcedea..f3f1a3b44de2 100644 --- a/src/Illuminate/Queue/Worker.php +++ b/src/Illuminate/Queue/Worker.php @@ -85,6 +85,12 @@ public function daemon($connectionName, $queue, WorkerOptions $options) $lastRestart = $this->getTimestampOfLastQueueRestart(); while (true) { + if (! $this->daemonShouldRun($options)) { + $this->pauseWorker($options, $lastRestart); + + continue; + } + // First, we will attempt to get the next job off of the queue. We will also // register the timeout handler and reset the alarm for this job so it is // not stuck in a frozen state forever. Then, we can fire off this job. @@ -101,7 +107,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options) // If the daemon should run (not in maintenance mode, etc.), then we can run // fire off this job for processing. Otherwise, we will need to sleep the // worker so no more jobs are processed until they should be processed. - if ($job && $this->daemonShouldRun($options)) { + if ($job) { $this->runJob($job, $connectionName, $options); } else { $this->sleep($options->sleep); @@ -110,11 +116,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options) // Finally, we will check to see if we have exceeded our memory limits or if // the queue should restart based on other indications. If so, we'll stop // this worker and let whatever is "monitoring" it restart the process. - if ($this->memoryExceeded($options->memory)) { - $this->stop(12); - } elseif ($this->queueShouldRestart($lastRestart)) { - $this->stop(); - } + $this->stopIfNecessary($options, $lastRestart); } } @@ -148,8 +150,7 @@ protected function registerTimeoutHandler($job, WorkerOptions $options) */ protected function timeoutForJob($job, WorkerOptions $options) { - return $job && ! is_null($job->timeout()) - ? $job->timeout() : $options->timeout; + return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout; } /** @@ -160,18 +161,38 @@ protected function timeoutForJob($job, WorkerOptions $options) */ protected function daemonShouldRun(WorkerOptions $options) { - if (($this->manager->isDownForMaintenance() && ! $options->force) || + return ! (($this->manager->isDownForMaintenance() && ! $options->force) || $this->paused || - $this->events->until(new Events\Looping) === false) { - // If the application is down for maintenance or doesn't want the queues to run - // we will sleep for one second just in case the developer has it set to not - // sleep at all. This just prevents CPU from maxing out in this situation. - $this->sleep(1); + $this->events->until(new Events\Looping) === false); + } - return false; - } + /** + * Pause the worker for the current loop. + * + * @param WorkerOptions $options + * @param int $lastRestart + * @return void + */ + protected function pauseWorker(WorkerOptions $options, $lastRestart) + { + $this->sleep($options->sleep > 0 ? $options->sleep : 1); - return true; + $this->stopIfNecessary($options, $lastRestart); + } + + /** + * Stop the process if necessary. + * + * @param WorkerOptions $options + * @param int $lastRestart + */ + protected function stopIfNecessary(WorkerOptions $options, $lastRestart) + { + if ($this->memoryExceeded($options->memory)) { + $this->stop(12); + } elseif ($this->queueShouldRestart($lastRestart)) { + $this->stop(); + } } /**