Skip to content

Commit

Permalink
Merge branch '5.2' of https://github.com/20TRIES/framework into 20TRI…
Browse files Browse the repository at this point in the history
…ES-5.2
  • Loading branch information
taylorotwell committed Jul 2, 2016
2 parents 18744e9 + c7975cb commit 87f3e5d
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 48 deletions.
72 changes: 26 additions & 46 deletions src/Illuminate/Queue/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

use DateTime;
use Carbon\Carbon;
use Illuminate\Support\Collection;
use Illuminate\Database\Connection;
use Illuminate\Queue\Jobs\DatabaseJob;
use Illuminate\Database\Query\Expression;
use Illuminate\Contracts\Queue\Queue as QueueContract;

class DatabaseQueue extends Queue implements QueueContract
Expand Down Expand Up @@ -161,14 +159,10 @@ public function pop($queue = null)
{
$queue = $this->getQueue($queue);

if (! is_null($this->expire)) {
$this->releaseJobsThatHaveBeenReservedTooLong($queue);
}

$this->database->beginTransaction();

if ($job = $this->getNextAvailableJob($queue)) {
$this->markJobAsReserved($job->id);
$job = $this->markJobAsReserved($job);

$this->database->commit();

Expand All @@ -180,38 +174,6 @@ public function pop($queue = null)
$this->database->commit();
}

/**
* Release the jobs that have been reserved for too long.
*
* @param string $queue
* @return void
*/
protected function releaseJobsThatHaveBeenReservedTooLong($queue)
{
if (random_int(1, 10) < 10) {
return;
}

$this->database->beginTransaction();

$stale = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where('reserved', 1)
->where('reserved_at', '<=', Carbon::now()->subSeconds($this->expire)->getTimestamp())
->get();

$this->database->table($this->table)
->whereIn('id', Collection::make($stale)->pluck('id')->all())
->update([
'reserved' => 0,
'reserved_at' => null,
'attempts' => new Expression('attempts + 1'),
]);

$this->database->commit();
}

/**
* Get the next available job for the queue.
*
Expand All @@ -223,8 +185,18 @@ protected function getNextAvailableJob($queue)
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where('reserved', 0)
->where('available_at', '<=', $this->getTime())
->where(function ($query) {
// Where not reserved.
$query->where(function ($query) {
$query->where('reserved', 0);
$query->where('available_at', '<=', $this->getTime());
});
// Or where reserved and reservation has expired.
$query->orWhere(function ($query) {
$query->where('reserved', 1);
$query->where('reserved_at', '<=', Carbon::now()->subSeconds($this->expire)->getTimestamp());
});
})
->orderBy('id', 'asc')
->first();

Expand All @@ -234,14 +206,22 @@ protected function getNextAvailableJob($queue)
/**
* Mark the given job ID as reserved.
*
* @param string $id
* @return void
* @param \stdClass $job
* @return \stdClass
*/
protected function markJobAsReserved($id)
protected function markJobAsReserved($job)
{
$this->database->table($this->table)->where('id', $id)->update([
'reserved' => 1, 'reserved_at' => $this->getTime(),
$job->reserved = 1;
$job->reserved_at = $this->getTime();
$job->attempts = ++$job->attempts;

$this->database->table($this->table)->where('id', $job->id)->update([
'reserved' => $job->reserved,
'reserved_at' => $job->reserved_at,
'attempts' => $job->attempts,
]);

return $job;
}

/**
Expand Down
1 change: 0 additions & 1 deletion src/Illuminate/Queue/Jobs/DatabaseJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public function __construct(Container $container, DatabaseQueue $database, $job,
$this->queue = $queue;
$this->database = $database;
$this->container = $container;
$this->job->attempts = $this->job->attempts + 1;
}

/**
Expand Down
218 changes: 218 additions & 0 deletions tests/Queue/QueueDatabaseQueueIntegrationTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
<?php

use Illuminate\Database\Capsule\Manager as DB;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Database\Eloquent\Model as Eloquent;
use \Illuminate\Queue\DatabaseQueue;
use Carbon\Carbon;
use Illuminate\Container\Container;

class QueueDatabaseQueueIntegrationTest extends PHPUnit_Framework_TestCase
{
/**
* @var DatabaseQueue The queue instance.
*/
protected $queue;

/**
* @var string The jobs table name.
*/
protected $table;

/**
* @var Container The IOC container.
*/
protected $container;

public function setUp()
{
$db = new DB;

$db->addConnection([
'driver' => 'sqlite',
'database' => ':memory:',
]);

$db->bootEloquent();

$db->setAsGlobal();

$this->table = 'jobs';

$this->queue = new DatabaseQueue($this->connection(), $this->table);

$this->container = $this->getMock(Container::class);

$this->queue->setContainer($this->container);

$this->createSchema();
}

/**
* Setup the database schema.
*
* @return void
*/
public function createSchema()
{
$this->schema()->create($this->table, function (Blueprint $table) {
$table->bigIncrements('id');
$table->string('queue');
$table->longText('payload');
$table->tinyInteger('attempts')->unsigned();
$table->tinyInteger('reserved')->unsigned();
$table->unsignedInteger('reserved_at')->nullable();
$table->unsignedInteger('available_at');
$table->unsignedInteger('created_at');
$table->index(['queue', 'reserved', 'reserved_at']);
});
}

/**
* Get a database connection instance.
*
* @return \Illuminate\Database\Connection
*/
protected function connection()
{
return Eloquent::getConnectionResolver()->connection();
}

/**
* Get a schema builder instance.
*
* @return Illuminate\Database\Schema\Builder
*/
protected function schema()
{
return $this->connection()->getSchemaBuilder();
}

/**
* Tear down the database schema.
*
* @return void
*/
public function tearDown()
{
$this->schema()->drop('jobs');
}

/**
* Test that jobs that are not reserved and have an available_at value less then now, are popped.
*/
public function testAvailableAndUnReservedJobsArePopped()
{
$this->connection()
->table('jobs')
->insert([
'id' => 1,
'queue' => $mock_queue_name = 'mock_queue_name',
'payload' => 'mock_payload',
'attempts' => 0,
'reserved' => 0,
'reserved_at' => null,
'available_at' => Carbon::now()->subSeconds(1)->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
]);

$popped_job = $this->queue->pop($mock_queue_name);

$this->assertNotNull($popped_job);
}

/**
* Test that when jobs are popped, the attempts attribute is incremented.
*/
public function testPoppedJobsIncrementAttempts()
{
$job = [
'id' => 1,
'queue' => 'mock_queue_name',
'payload' => 'mock_payload',
'attempts' => 0,
'reserved' => 0,
'reserved_at' => null,
'available_at' => Carbon::now()->subSeconds(1)->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
];

$this->connection()->table('jobs')->insert($job);

$popped_job = $this->queue->pop($job['queue']);

$database_record = $this->connection()->table('jobs')->find($job['id']);

$this->assertEquals(1, $database_record->attempts, 'Job attempts not updated in the database!');
$this->assertEquals(1, $popped_job->attempts(), 'The "attempts" attribute of the Job object was not updated by pop!');
}

/**
* Test that jobs that are not reserved and have an available_at value in the future, are not popped.
*/
public function testUnavailableJobsAreNotPopped()
{
$this->connection()
->table('jobs')
->insert([
'id' => 1,
'queue' => $mock_queue_name = 'mock_queue_name',
'payload' => 'mock_payload',
'attempts' => 0,
'reserved' => 0,
'reserved_at' => null,
'available_at' => Carbon::now()->addSeconds(60)->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
]);

$popped_job = $this->queue->pop($mock_queue_name);

$this->assertNull($popped_job);
}

/**
* Test that jobs that are reserved and have expired are popped.
*/
public function testThatReservedAndExpiredJobsArePopped()
{
$this->connection()
->table('jobs')
->insert([
'id' => 1,
'queue' => $mock_queue_name = 'mock_queue_name',
'payload' => 'mock_payload',
'attempts' => 0,
'reserved' => 1,
'reserved_at' => Carbon::now()->subDay()->getTimestamp(),
'available_at' => Carbon::now()->addDay()->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
]);

$popped_job = $this->queue->pop($mock_queue_name);

$this->assertNotNull($popped_job);
}

/**
* Test that jobs that are reserved and not expired and available are not popped.
*/
public function testThatReservedJobsAreNotPopped()
{
$this->connection()
->table('jobs')
->insert([
'id' => 1,
'queue' => $mock_queue_name = 'mock_queue_name',
'payload' => 'mock_payload',
'attempts' => 0,
'reserved' => 1,
'reserved_at' => Carbon::now()->addDay()->getTimestamp(),
'available_at' => Carbon::now()->subDay()->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
]);

$popped_job = $this->queue->pop($mock_queue_name);

$this->assertNull($popped_job);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use Mockery as m;

class QueueDatabaseQueueTest extends PHPUnit_Framework_TestCase
class QueueDatabaseQueueUnitTest extends PHPUnit_Framework_TestCase
{
public function tearDown()
{
Expand Down

0 comments on commit 87f3e5d

Please sign in to comment.