Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/Illuminate/Bus/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ class Dispatcher implements QueueingDispatcher

/**
* Create a new command dispatcher instance.
*
* @param \Illuminate\Contracts\Container\Container $container
* @param \Closure|null $queueResolver
*/
public function __construct(Container $container, ?Closure $queueResolver = null)
{
Expand Down Expand Up @@ -139,7 +136,6 @@ public function dispatchNow($command, $handler = null)
/**
* Attempt to find the batch with the given ID.
*
* @param string $batchId
* @return \Illuminate\Bus\Batch|null
*/
public function findBatch(string $batchId)
Expand Down Expand Up @@ -273,7 +269,6 @@ public function dispatchAfterResponse($command, $handler = null)
/**
* Set the pipes through which commands should be piped before dispatching.
*
* @param array $pipes
* @return $this
*/
public function pipeThrough(array $pipes)
Expand All @@ -286,7 +281,6 @@ public function pipeThrough(array $pipes)
/**
* Map a command to a handler.
*
* @param array $map
* @return $this
*/
public function map(array $map)
Expand Down
33 changes: 33 additions & 0 deletions src/Illuminate/Queue/Connectors/FailoverConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace Illuminate\Queue\Connectors;

use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Queue\FailoverQueue;
use Illuminate\Queue\QueueManager;

class FailoverConnector implements ConnectorInterface
{
/**
* Create a new connector instance.
*/
public function __construct(
protected QueueManager $manager,
protected Dispatcher $events
) {
}

/**
* Establish a queue connection.
*
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new FailoverQueue(
$this->manager,
$this->events,
$config['connections'],
);
}
}
18 changes: 18 additions & 0 deletions src/Illuminate/Queue/Events/QueueFailedOver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Illuminate\Queue\Events;

class QueueFailedOver
{
/**
* Create a new event instance.
*
* @param string $connectionName The queue connection that failed.
* @param \Closure|string|object $job The job instance.
*/
public function __construct(
public ?string $connectionName,
public mixed $command,
) {
}
}
152 changes: 152 additions & 0 deletions src/Illuminate/Queue/FailoverQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
<?php

namespace Illuminate\Queue;

use Illuminate\Contracts\Events\Dispatcher as EventDispatcher;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Events\QueueFailedOver;
use Throwable;

class FailoverQueue extends Queue implements QueueContract
{
/**
* Create a new failover queue instance.
*/
public function __construct(
public QueueManager $manager,
public EventDispatcher $events,
public array $connections
) {
}

/**
* Get the size of the queue.
*
* @param string|null $queue
* @return int
*/
public function size($queue = null)
{
return $this->manager->connection($this->connections[0])->size($queue);
}

/**
* Get the number of pending jobs.
*
* @param string|null $queue
* @return int
*/
public function pendingSize($queue = null)
{
return $this->manager->connection($this->connections[0])->pendingSize($queue);
}

/**
* Get the number of delayed jobs.
*
* @param string|null $queue
* @return int
*/
public function delayedSize($queue = null)
{
return $this->manager->connection($this->connections[0])->delayedSize($queue);
}

/**
* Get the number of reserved jobs.
*
* @param string|null $queue
* @return int
*/
public function reservedSize($queue = null)
{
return $this->manager->connection($this->connections[0])->reservedSize($queue);
}

/**
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
*
* @param string|null $queue
* @return int|null
*/
public function creationTimeOfOldestPendingJob($queue = null)
{
return $this->manager
->connection($this->connections[0])
->creationTimeOfOldestPendingJob($queue);
}

/**
* Push a new job onto the queue.
*
* @param object|string $job
* @param mixed $data
* @param string|null $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
foreach ($this->connections as $connection) {
try {
return $this->manager->connection($connection)->push($job, $data, $queue);
} catch (Throwable $e) {
$this->events->dispatch(new QueueFailedOver($connection, $job));
}
}

throw $e;

Check failure on line 97 in src/Illuminate/Queue/FailoverQueue.php

View workflow job for this annotation

GitHub Actions / Source Code

Variable $e might not be defined.
}

/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string|null $queue
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
foreach ($this->connections as $connection) {
try {
return $this->manager->connection($connection)->pushRaw($payload, $queue, $options);
} catch (Throwable $e) {
//
}
}

throw $e;

Check failure on line 117 in src/Illuminate/Queue/FailoverQueue.php

View workflow job for this annotation

GitHub Actions / Source Code

Variable $e might not be defined.
}

/**
* Push a new job onto the queue after (n) seconds.
*
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @param string|null $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
foreach ($this->connections as $connection) {
try {
return $this->manager->connection($connection)->later($delay, $job, $data, $queue);
} catch (Throwable $e) {
$this->events->dispatch(new QueueFailedOver($connection, $job));
}
}

throw $e;

Check failure on line 139 in src/Illuminate/Queue/FailoverQueue.php

View workflow job for this annotation

GitHub Actions / Source Code

Variable $e might not be defined.
}

/**
* Pop the next job off of the queue.
*
* @param string|null $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
return $this->manager->connection($this->connections[0])->pop($queue);
}
}
30 changes: 30 additions & 0 deletions src/Illuminate/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ abstract class Queue
*/
protected $connectionName;

/**
* The original configuration for the queue.
*
* @var array
*/
protected $config;

/**
* Indicates that jobs should be dispatched after all database transactions have committed.
*
Expand Down Expand Up @@ -452,6 +459,29 @@ public function setConnectionName($name)
return $this;
}

/**
* Get the queue configuration array.
*
* @return array
*/
public function getConfig()
{
return $this->config;
}

/**
* Set the queue configuration array.
*
* @param array $config
* @return $this
*/
public function setConfig(array $config)
{
$this->config = $config;

return $this;
}

/**
* Get the container instance being used by the connection.
*
Expand Down
8 changes: 7 additions & 1 deletion src/Illuminate/Queue/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,15 @@ protected function resolve($name)
throw new InvalidArgumentException("The [{$name}] queue connection has not been configured.");
}

return $this->getConnector($config['driver'])
$queue = $this->getConnector($config['driver'])
->connect($config)
->setConnectionName($name);

if (method_exists($queue, 'setConfig')) {
$queue->setConfig($config);
}

return $queue;
}

/**
Expand Down
20 changes: 19 additions & 1 deletion src/Illuminate/Queue/QueueServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

use Aws\DynamoDb\DynamoDbClient;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Contracts\Events\Dispatcher as EventDispatcher;
use Illuminate\Contracts\Support\DeferrableProvider;
use Illuminate\Queue\Connectors\BeanstalkdConnector;
use Illuminate\Queue\Connectors\DatabaseConnector;
use Illuminate\Queue\Connectors\FailoverConnector;
use Illuminate\Queue\Connectors\NullConnector;
use Illuminate\Queue\Connectors\RedisConnector;
use Illuminate\Queue\Connectors\SqsConnector;
Expand Down Expand Up @@ -102,7 +104,7 @@ protected function registerConnection()
*/
public function registerConnectors($manager)
{
foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
foreach (['Null', 'Sync', 'Failover', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
$this->{"register{$connector}Connector"}($manager);
}
}
Expand Down Expand Up @@ -133,6 +135,22 @@ protected function registerSyncConnector($manager)
});
}

/**
* Register the Failover queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerFailoverConnector($manager)
{
$manager->addConnector('failover', function () use ($manager) {
return new FailoverConnector(
$manager,
$this->app->make(EventDispatcher::class)
);
});
}

/**
* Register the database queue connector.
*
Expand Down
46 changes: 46 additions & 0 deletions tests/Queue/FailoverQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

namespace Illuminate\Tests\Queue;

use Illuminate\Container\Container;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Queue\FailoverQueue;
use Illuminate\Queue\QueueManager;
use Mockery as m;
use PHPUnit\Framework\TestCase;

class FailoverQueueTest extends TestCase
{
protected function tearDown(): void
{
m::close();

Container::setInstance(null);
}

public function test_push_fails_over_on_exception()
{
$failover = new FailoverQueue($queue = m::mock(QueueManager::class), $events = m::mock(Dispatcher::class), [
'redis',
'sync',
]);

$queue->shouldReceive('connection')->once()->with('redis')->andReturn(
$redis = m::mock('stdClass'),
);

$queue->shouldReceive('connection')->once()->with('sync')->andReturn(
$sync = m::mock('stdClass'),
);

$events->shouldReceive('dispatch')->once();

$redis->shouldReceive('push')->once()->andReturnUsing(
fn () => throw new \Exception('error')
);

$sync->shouldReceive('push')->once();

$failover->push('some-job');
}
}
Loading