Skip to content

Commit

Permalink
use locking to migrate stale jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorotwell committed May 17, 2016
1 parent 4aca7e0 commit 26a24d6
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions src/Illuminate/Queue/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use DateTime;
use Exception;
use Carbon\Carbon;
use Illuminate\Support\Collection;
use Illuminate\Database\Connection;
use Illuminate\Queue\Jobs\DatabaseJob;
use Illuminate\Database\Query\Expression;
Expand Down Expand Up @@ -186,21 +187,28 @@ public function pop($queue = null)
*/
protected function releaseJobsThatHaveBeenReservedTooLong($queue)
{
$expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();

try {
$this->database->table($this->table)
->where('queue', $this->getQueue($queue))
->where('reserved', 1)
->where('reserved_at', '<=', $expired)
->update([
'reserved' => 0,
'reserved_at' => null,
'attempts' => new Expression('attempts + 1'),
]);
} catch (Exception $e) {
//
if (random_int(1, 10) < 10) {
return;
}

$this->database->beginTransaction();

$stale = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where('reserved', 1)
->where('reserved_at', '<=', Carbon::now()->subSeconds($this->expire)->getTimestamp())
->get();

$this->database->table($this->table)
->whereIn('id', Collection::make($stale)->pluck('id')->all())
->update([
'reserved' => 0,
'reserved_at' => null,
'attempts' => new Expression('attempts + 1'),
]);

$this->database->commit();
}

/**
Expand Down

0 comments on commit 26a24d6

Please sign in to comment.