Skip to content

Commit

Permalink
Refactor how connection names are set so they are always set.
Browse files Browse the repository at this point in the history
This fixes problem where connection name would only be set if running
through queue:listen and queue:work commands but not if you manually
called pop() somewhere in your own code.
  • Loading branch information
taylorotwell committed Dec 30, 2016
1 parent feb52bf commit 4c600fb
Show file tree
Hide file tree
Showing 18 changed files with 88 additions and 20 deletions.
15 changes: 15 additions & 0 deletions src/Illuminate/Contracts/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,19 @@ public function bulk($jobs, $data = '', $queue = null);
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null);

/**
* Get the connection name for the queue.
*
* @return string
*/
public function getConnectionName();

/**
* Set the connection name for the queue.
*
* @param string $name
* @return $this
*/
public function setConnectionName($name);
}
4 changes: 3 additions & 1 deletion src/Illuminate/Queue/BeanstalkdQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ public function pop($queue = null)
$job = $this->pheanstalk->watchOnly($queue)->reserve(0);

if ($job instanceof PheanstalkJob) {
return new BeanstalkdJob($this->container, $this->pheanstalk, $job, $queue);
return new BeanstalkdJob(
$this->container, $this->pheanstalk, $job, $this->connectionName, $queue
);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Illuminate/Queue/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ protected function marshalJob($queue, $job)
$this->database->commit();

return new DatabaseJob(
$this->container, $this, $job, $queue
$this->container, $this, $job, $this->connectionName, $queue
);
}

Expand Down
4 changes: 3 additions & 1 deletion src/Illuminate/Queue/Jobs/BeanstalkdJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ class BeanstalkdJob extends Job implements JobContract
* @param \Illuminate\Container\Container $container
* @param \Pheanstalk\Pheanstalk $pheanstalk
* @param \Pheanstalk\Job $job
* @param string $connectionName
* @param string $queue
* @return void
*/
public function __construct(Container $container, Pheanstalk $pheanstalk, PheanstalkJob $job, $queue)
public function __construct(Container $container, Pheanstalk $pheanstalk, PheanstalkJob $job, $connectionName, $queue)
{
$this->job = $job;
$this->queue = $queue;
$this->container = $container;
$this->pheanstalk = $pheanstalk;
$this->connectionName = $connectionName;
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/Illuminate/Queue/Jobs/DatabaseJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ class DatabaseJob extends Job implements JobContract
* @param \Illuminate\Container\Container $container
* @param \Illuminate\Queue\DatabaseQueue $database
* @param \StdClass $job
* @param string $connectionName
* @param string $queue
* @return void
*/
public function __construct(Container $container, DatabaseQueue $database, $job, $queue)
public function __construct(Container $container, DatabaseQueue $database, $job, $connectionName, $queue)
{
$this->job = $job;
$this->queue = $queue;
$this->database = $database;
$this->container = $container;
$this->connectionName = $connectionName;
}

/**
Expand Down
6 changes: 5 additions & 1 deletion src/Illuminate/Queue/Jobs/RedisJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ class RedisJob extends Job implements JobContract
* @param \Illuminate\Queue\RedisQueue $redis
* @param string $job
* @param string $reserved
* @param string $connectionName
* @param string $queue
* @return void
*/
public function __construct(Container $container, RedisQueue $redis, $job, $reserved, $queue)
public function __construct(Container $container, RedisQueue $redis, $job, $reserved, $connectionName, $queue)
{
// The $job variable is the original job JSON as it existed in the ready queue while
// the $reserved variable is the raw JSON in the reserved queue. The exact format
Expand All @@ -56,6 +58,8 @@ public function __construct(Container $container, RedisQueue $redis, $job, $rese
$this->queue = $queue;
$this->reserved = $reserved;
$this->container = $container;
$this->connectionName = $connectionName;

$this->decoded = $this->payload();
}

Expand Down
6 changes: 4 additions & 2 deletions src/Illuminate/Queue/Jobs/SqsJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@ class SqsJob extends Job implements JobContract
*
* @param \Illuminate\Container\Container $container
* @param \Aws\Sqs\SqsClient $sqs
* @param string $queue
* @param array $job
* @param string $connectionName
* @param string $queue
* @return void
*/
public function __construct(Container $container, SqsClient $sqs, $queue, array $job)
public function __construct(Container $container, SqsClient $sqs, array $job, $connectionName, $queue)
{
$this->sqs = $sqs;
$this->job = $job;
$this->queue = $queue;
$this->container = $container;
$this->connectionName = $connectionName;
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/Illuminate/Queue/Jobs/SyncJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ class SyncJob extends Job implements JobContract
*
* @param \Illuminate\Container\Container $container
* @param string $payload
* @param string $connectionName
* @param string $queue
* @return void
*/
public function __construct(Container $container, $payload, $queue)
public function __construct(Container $container, $payload, $connectionName, $queue)
{
$this->queue = $queue;
$this->payload = $payload;
$this->container = $container;
$this->connectionName = $connectionName;
}

/**
Expand Down
30 changes: 30 additions & 0 deletions src/Illuminate/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ abstract class Queue
*/
protected $encrypter;

/**
* The connection name for the queue.
*
* @var string
*/
protected $connectionName;

/**
* Push a new job onto the queue.
*
Expand Down Expand Up @@ -131,6 +138,29 @@ protected function createStringPayload($job, $data)
return ['job' => $job, 'data' => $data];
}

/**
* Get the connection name for the queue.
*
* @return string
*/
public function getConnectionName()
{
return $this->connectionName;
}

/**
* Set the connection name for the queue.
*
* @param string $name
* @return $this
*/
public function setConnectionName($name)
{
$this->connectionName = $name;

return $this;
}

/**
* Set the IoC container instance.
*
Expand Down
4 changes: 3 additions & 1 deletion src/Illuminate/Queue/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ protected function resolve($name)
{
$config = $this->getConfig($name);

return $this->getConnector($config['driver'])->connect($config);
return $this->getConnector($config['driver'])
->connect($config)
->setConnectionName($name);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/Illuminate/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public function pop($queue = null)
if ($reserved) {
return new RedisJob(
$this->container, $this, $job,
$reserved, $queue ?: $this->default
$reserved, $this->connectionName, $queue ?: $this->default
);
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/Illuminate/Queue/SqsQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ public function pop($queue = null)
]);

if (count($response['Messages']) > 0) {
return new SqsJob($this->container, $this->sqs, $queue, $response['Messages'][0]);
return new SqsJob(
$this->container, $this->sqs, $response['Messages'][0],
$this->connectionName, $queue
);
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/Illuminate/Queue/SyncQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public function push($job, $data = '', $queue = null)
*/
protected function resolveJob($payload, $queue)
{
return new SyncJob($this->container, $payload, $queue);
return new SyncJob($this->container, $payload, $this->connectionName, $queue);
}

/**
Expand All @@ -72,7 +72,7 @@ protected function resolveJob($payload, $queue)
protected function raiseBeforeJobEvent(Job $job)
{
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobProcessing('sync', $job));
$this->container['events']->fire(new Events\JobProcessing($this->connectionName, $job));
}
}

Expand All @@ -85,7 +85,7 @@ protected function raiseBeforeJobEvent(Job $job)
protected function raiseAfterJobEvent(Job $job)
{
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobProcessed('sync', $job));
$this->container['events']->fire(new Events\JobProcessed($this->connectionName, $job));
}
}

Expand All @@ -99,7 +99,7 @@ protected function raiseAfterJobEvent(Job $job)
protected function raiseExceptionOccurredJobEvent(Job $job, $e)
{
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobExceptionOccurred('sync', $job, $e));
$this->container['events']->fire(new Events\JobExceptionOccurred($this->connectionName, $job, $e));
}
}

Expand All @@ -116,7 +116,7 @@ protected function handleException($queueJob, $e)
{
$this->raiseExceptionOccurredJobEvent($queueJob, $e);

FailingJob::handle('sync', $queueJob, $e);
FailingJob::handle($this->connectionName, $queueJob, $e);

throw $e;
}
Expand Down
2 changes: 0 additions & 2 deletions src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ public function runNextJob($connectionName, $queue, WorkerOptions $options)
// from this method. If there is no job on the queue, we will "sleep" the worker
// for the specified number of seconds, then keep processing jobs after sleep.
if ($job) {
$job->setConnectionName($connectionName);

return $this->runJob($job, $connectionName, $options);
}

Expand Down
1 change: 1 addition & 0 deletions tests/Queue/QueueBeanstalkdJobTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ protected function getJob()
m::mock('Illuminate\Container\Container'),
m::mock('Pheanstalk\Pheanstalk'),
m::mock('Pheanstalk\Job'),
'connection-name',
'default'
);
}
Expand Down
3 changes: 3 additions & 0 deletions tests/Queue/QueueManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public function testDefaultConnectionCanBeResolved()
$manager = new QueueManager($app);
$connector = m::mock('StdClass');
$queue = m::mock('StdClass');
$queue->shouldReceive('setConnectionName')->once()->with('sync')->andReturnSelf();
$connector->shouldReceive('connect')->once()->with(['driver' => 'sync'])->andReturn($queue);
$manager->addConnector('sync', function () use ($connector) {
return $connector;
Expand All @@ -46,6 +47,7 @@ public function testOtherConnectionCanBeResolved()
$manager = new QueueManager($app);
$connector = m::mock('StdClass');
$queue = m::mock('StdClass');
$queue->shouldReceive('setConnectionName')->once()->with('foo')->andReturnSelf();
$connector->shouldReceive('connect')->once()->with(['driver' => 'bar'])->andReturn($queue);
$manager->addConnector('bar', function () use ($connector) {
return $connector;
Expand All @@ -67,6 +69,7 @@ public function testNullConnectionCanBeResolved()
$manager = new QueueManager($app);
$connector = m::mock('StdClass');
$queue = m::mock('StdClass');
$queue->shouldReceive('setConnectionName')->once()->with('null')->andReturnSelf();
$connector->shouldReceive('connect')->once()->with(['driver' => 'null'])->andReturn($queue);
$manager->addConnector('null', function () use ($connector) {
return $connector;
Expand Down
1 change: 1 addition & 0 deletions tests/Queue/QueueRedisJobTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ protected function getJob()
m::mock(Illuminate\Queue\RedisQueue::class),
json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 1]),
json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 2]),
'connection-name',
'default'
);
}
Expand Down
5 changes: 3 additions & 2 deletions tests/Queue/QueueSqsJobTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ protected function getJob()
return new Illuminate\Queue\Jobs\SqsJob(
$this->mockedContainer,
$this->mockedSqsClient,
$this->queueUrl,
$this->mockedJobData
$this->mockedJobData,
'connection-name',
$this->queueUrl
);
}
}

0 comments on commit 4c600fb

Please sign in to comment.