diff --git a/src/Illuminate/Bus/Dispatcher.php b/src/Illuminate/Bus/Dispatcher.php index 01bf5ec457d7..891573219c5a 100644 --- a/src/Illuminate/Bus/Dispatcher.php +++ b/src/Illuminate/Bus/Dispatcher.php @@ -60,9 +60,6 @@ class Dispatcher implements QueueingDispatcher /** * Create a new command dispatcher instance. - * - * @param \Illuminate\Contracts\Container\Container $container - * @param \Closure|null $queueResolver */ public function __construct(Container $container, ?Closure $queueResolver = null) { @@ -139,7 +136,6 @@ public function dispatchNow($command, $handler = null) /** * Attempt to find the batch with the given ID. * - * @param string $batchId * @return \Illuminate\Bus\Batch|null */ public function findBatch(string $batchId) @@ -273,7 +269,6 @@ public function dispatchAfterResponse($command, $handler = null) /** * Set the pipes through which commands should be piped before dispatching. * - * @param array $pipes * @return $this */ public function pipeThrough(array $pipes) @@ -286,7 +281,6 @@ public function pipeThrough(array $pipes) /** * Map a command to a handler. * - * @param array $map * @return $this */ public function map(array $map) diff --git a/src/Illuminate/Queue/Connectors/FailoverConnector.php b/src/Illuminate/Queue/Connectors/FailoverConnector.php new file mode 100644 index 000000000000..5472d23a6e5f --- /dev/null +++ b/src/Illuminate/Queue/Connectors/FailoverConnector.php @@ -0,0 +1,33 @@ +manager, + $this->events, + $config['connections'], + ); + } +} diff --git a/src/Illuminate/Queue/Events/QueueFailedOver.php b/src/Illuminate/Queue/Events/QueueFailedOver.php new file mode 100644 index 000000000000..bf9ad63c6ac1 --- /dev/null +++ b/src/Illuminate/Queue/Events/QueueFailedOver.php @@ -0,0 +1,18 @@ +manager->connection($this->connections[0])->size($queue); + } + + /** + * Get the number of pending jobs. + * + * @param string|null $queue + * @return int + */ + public function pendingSize($queue = null) + { + return $this->manager->connection($this->connections[0])->pendingSize($queue); + } + + /** + * Get the number of delayed jobs. + * + * @param string|null $queue + * @return int + */ + public function delayedSize($queue = null) + { + return $this->manager->connection($this->connections[0])->delayedSize($queue); + } + + /** + * Get the number of reserved jobs. + * + * @param string|null $queue + * @return int + */ + public function reservedSize($queue = null) + { + return $this->manager->connection($this->connections[0])->reservedSize($queue); + } + + /** + * Get the creation timestamp of the oldest pending job, excluding delayed jobs. + * + * @param string|null $queue + * @return int|null + */ + public function creationTimeOfOldestPendingJob($queue = null) + { + return $this->manager + ->connection($this->connections[0]) + ->creationTimeOfOldestPendingJob($queue); + } + + /** + * Push a new job onto the queue. + * + * @param object|string $job + * @param mixed $data + * @param string|null $queue + * @return mixed + */ + public function push($job, $data = '', $queue = null) + { + foreach ($this->connections as $connection) { + try { + return $this->manager->connection($connection)->push($job, $data, $queue); + } catch (Throwable $e) { + $this->events->dispatch(new QueueFailedOver($connection, $job)); + } + } + + throw $e; + } + + /** + * Push a raw payload onto the queue. + * + * @param string $payload + * @param string|null $queue + * @return mixed + */ + public function pushRaw($payload, $queue = null, array $options = []) + { + foreach ($this->connections as $connection) { + try { + return $this->manager->connection($connection)->pushRaw($payload, $queue, $options); + } catch (Throwable $e) { + // + } + } + + throw $e; + } + + /** + * Push a new job onto the queue after (n) seconds. + * + * @param \DateTimeInterface|\DateInterval|int $delay + * @param string $job + * @param mixed $data + * @param string|null $queue + * @return mixed + */ + public function later($delay, $job, $data = '', $queue = null) + { + foreach ($this->connections as $connection) { + try { + return $this->manager->connection($connection)->later($delay, $job, $data, $queue); + } catch (Throwable $e) { + $this->events->dispatch(new QueueFailedOver($connection, $job)); + } + } + + throw $e; + } + + /** + * Pop the next job off of the queue. + * + * @param string|null $queue + * @return \Illuminate\Contracts\Queue\Job|null + */ + public function pop($queue = null) + { + return $this->manager->connection($this->connections[0])->pop($queue); + } +} diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 15a1edd587a6..ecf4ac3fdd8a 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -38,6 +38,13 @@ abstract class Queue */ protected $connectionName; + /** + * The original configuration for the queue. + * + * @var array + */ + protected $config; + /** * Indicates that jobs should be dispatched after all database transactions have committed. * @@ -452,6 +459,29 @@ public function setConnectionName($name) return $this; } + /** + * Get the queue configuration array. + * + * @return array + */ + public function getConfig() + { + return $this->config; + } + + /** + * Set the queue configuration array. + * + * @param array $config + * @return $this + */ + public function setConfig(array $config) + { + $this->config = $config; + + return $this; + } + /** * Get the container instance being used by the connection. * diff --git a/src/Illuminate/Queue/QueueManager.php b/src/Illuminate/Queue/QueueManager.php index 9b57be09bbd4..3eefe965db8d 100755 --- a/src/Illuminate/Queue/QueueManager.php +++ b/src/Illuminate/Queue/QueueManager.php @@ -169,9 +169,15 @@ protected function resolve($name) throw new InvalidArgumentException("The [{$name}] queue connection has not been configured."); } - return $this->getConnector($config['driver']) + $queue = $this->getConnector($config['driver']) ->connect($config) ->setConnectionName($name); + + if (method_exists($queue, 'setConfig')) { + $queue->setConfig($config); + } + + return $queue; } /** diff --git a/src/Illuminate/Queue/QueueServiceProvider.php b/src/Illuminate/Queue/QueueServiceProvider.php index 3b1d97208da0..954b7a207ab7 100755 --- a/src/Illuminate/Queue/QueueServiceProvider.php +++ b/src/Illuminate/Queue/QueueServiceProvider.php @@ -4,9 +4,11 @@ use Aws\DynamoDb\DynamoDbClient; use Illuminate\Contracts\Debug\ExceptionHandler; +use Illuminate\Contracts\Events\Dispatcher as EventDispatcher; use Illuminate\Contracts\Support\DeferrableProvider; use Illuminate\Queue\Connectors\BeanstalkdConnector; use Illuminate\Queue\Connectors\DatabaseConnector; +use Illuminate\Queue\Connectors\FailoverConnector; use Illuminate\Queue\Connectors\NullConnector; use Illuminate\Queue\Connectors\RedisConnector; use Illuminate\Queue\Connectors\SqsConnector; @@ -102,7 +104,7 @@ protected function registerConnection() */ public function registerConnectors($manager) { - foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) { + foreach (['Null', 'Sync', 'Failover', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) { $this->{"register{$connector}Connector"}($manager); } } @@ -133,6 +135,22 @@ protected function registerSyncConnector($manager) }); } + /** + * Register the Failover queue connector. + * + * @param \Illuminate\Queue\QueueManager $manager + * @return void + */ + protected function registerFailoverConnector($manager) + { + $manager->addConnector('failover', function () use ($manager) { + return new FailoverConnector( + $manager, + $this->app->make(EventDispatcher::class) + ); + }); + } + /** * Register the database queue connector. * diff --git a/tests/Queue/FailoverQueueTest.php b/tests/Queue/FailoverQueueTest.php new file mode 100644 index 000000000000..cd3ca3f56b77 --- /dev/null +++ b/tests/Queue/FailoverQueueTest.php @@ -0,0 +1,46 @@ +shouldReceive('connection')->once()->with('redis')->andReturn( + $redis = m::mock('stdClass'), + ); + + $queue->shouldReceive('connection')->once()->with('sync')->andReturn( + $sync = m::mock('stdClass'), + ); + + $events->shouldReceive('dispatch')->once(); + + $redis->shouldReceive('push')->once()->andReturnUsing( + fn () => throw new \Exception('error') + ); + + $sync->shouldReceive('push')->once(); + + $failover->push('some-job'); + } +}