diff --git a/src/Illuminate/Bus/ChainLink.php b/src/Illuminate/Bus/ChainLink.php new file mode 100644 index 000000000000..2a9b61331948 --- /dev/null +++ b/src/Illuminate/Bus/ChainLink.php @@ -0,0 +1,72 @@ +chainId = $chainId; + $this->jobId = $jobId; + } + + /** + * Set the current jobs' IDs. + * + * @param string[] $current + * @return $this + */ + public function current(array $current) + { + $this->current = $current; + + return $this; + } + + /** + * Set the next jobs' IDs. + * + * @param string[] $current + * @return $this + */ + public function next(array $next) + { + $this->next = $next; + + return $this; + } +} diff --git a/src/Illuminate/Bus/Queueable.php b/src/Illuminate/Bus/Queueable.php index dde865e434fb..413b2218642c 100644 --- a/src/Illuminate/Bus/Queueable.php +++ b/src/Illuminate/Bus/Queueable.php @@ -26,11 +26,11 @@ trait Queueable public $delay; /** - * The jobs that should run if this job is successful. + * This job's link in the chain of jobs. * - * @var array + * @var \Illuminate\Bus\ChainLink */ - public $chained = []; + public $chain; /** * Set the desired connection for the job. @@ -70,33 +70,4 @@ public function delay($delay) return $this; } - - /** - * Set the jobs that should run if this job is successful. - * - * @param array $chain - * @return $this - */ - public function chain($chain) - { - $this->chained = collect($chain)->map(function ($job) { - return serialize($job); - })->all(); - - return $this; - } - - /** - * Dispatch the next job on the chain. - * - * @return void - */ - public function dispatchNextJobInChain() - { - if (! empty($this->chained)) { - dispatch(tap(unserialize(array_shift($this->chained)), function ($next) { - $next->chained = $this->chained; - })); - } - } } diff --git a/src/Illuminate/Foundation/Bus/ChainConductor.php b/src/Illuminate/Foundation/Bus/ChainConductor.php new file mode 100644 index 000000000000..4bf3c5173a12 --- /dev/null +++ b/src/Illuminate/Foundation/Bus/ChainConductor.php @@ -0,0 +1,212 @@ +db = $db; + } + + /** + * Create a new chain with the given chain of jobs. + * + * The chain should have a nested collection for each group of parallel jobs. + * + * @param \Illuminate\Support\Collection $chain + * @return void + */ + public function createChain(Collection $chain) + { + $chain = $chain->map->keyBy(function () { + return Uuid::uuid4()->toString(); + }); + + $this->populateChainLinks($chain); + + $this->saveChain($chain); + } + + /** + * Handle a job that has successfully completed execution. + * + * @param object $job + * @return void + */ + public function jobCompleted($job) + { + if (empty($job->chain)) { + return; + } + + $this->query()->delete($job->chain->jobId); + + if ($this->shouldDispatchNextJobs($job)) { + $this->dispatchNextJobs($job); + } + } + + /** + * Handle a job that has failed to successfully execute. + * + * @param object $job + * @return void + */ + public function jobFailed($job) + { + if (! empty($job->chain)) { + $this->query()->where('chain_id', $job->chain->chainId)->delete(); + } + } + + /** + * Populate the "chain" property on the jobs in the chain. + * + * @param \Illuminate\Support\Collection $chain + * @return void + */ + protected function populateChainLinks(Collection $chain) + { + $chainId = Uuid::uuid4()->toString(); + + // This creates the actual chain. Each job's "chain" property + // is set to an instance of ChainLink, with information on + // the current jobs and the immediately following jobs. + $chain->sliding(2)->eachSpread(function ($current, $next) use ($chainId) { + foreach ($current as $id => $job) { + $job->chain = $this->createChainLink( + $id, $chainId, $current, $next + ); + } + }); + + // The last link in the chain doesn't need all the information + // that the other links need - but it still has to know its + // id, as well as the chain id, to delete all when done. + $chain->last()->each(function ($job, $id) use ($chainId) { + $job->chain = new ChainLink($id, $chainId); + }); + } + + /** + * Create an instance of a chain link. + * + * @param int $id + * @param string $chainId + * @param \Illuminate\Support\Collection $current + * @param \Illuminate\Support\Collection $next + * @return \Illuminate\Bus\ChainLink + */ + protected function createChainLink($id, $chainId, $current, $next) + { + return (new ChainLink($id, $chainId)) + ->current($current->keys()->all()) + ->next($next->keys()->all()); + } + + /** + * Save the given chain of jobs to the DB. + * + * @param \Illuminate\Support\Collection $chain + * @return void + */ + protected function saveChain(Collection $chain) + { + $this->query()->insert($chain->collapse()->map(function ($job) { + return [ + 'id' => $job->chain->jobId, + 'chain_id' => $job->chain->chainId, + 'job' => serialize($job), + ]; + })->all()); + } + + /** + * Determines whether there are any other concurrent jobs that are not done. + * + * @param object $job + * @return bool + */ + protected function hasRemainingConcurrentJobs($job) + { + return $this->query()->whereIn('id', $job->chain->current)->exists(); + } + + /** + * Determines whether we are ready to dispatch the next link in the chain. + * + * @param object $job + * @return bool + */ + protected function shouldDispatchNextJobs($job) + { + return ! empty($job->chain->next) && ! $this->hasRemainingConcurrentJobs($job); + } + + /** + * Dispatch the jobs in the chain after the given job. + * + * @param object $job + * @return void + */ + protected function dispatchNextJobs($job) + { + $this->getNextJobs($job)->each(function ($job) { + dispatch(unserialize($job)); + }); + } + + /** + * Get the jobs that are next in the chain. + * + * @param object $job + * @return \Illuminate\Support\Collection + */ + protected function getNextJobs($job) + { + $query = $this->query()->whereIn('id', $job->chain->next); + + return $query->where('reserve_key', $this->reserve($query))->pluck('job'); + } + + /** + * Reserve the jobs in the given query. + * + * @param \Illuminate\Database\Query\Builder $query + * @return string + */ + protected function reserve($query) + { + return tap(Uuid::uuid4()->toString(), function ($key) use ($query) { + (clone $query)->whereNull('reserve_key')->update(['reserve_key' => $key]); + }); + } + + /** + * Get a query instance for the chained jobs table. + * + * @return \Illuminate\Database\Query\Builder + */ + protected function query() + { + return $this->db->table('chained_jobs'); + } +} diff --git a/src/Illuminate/Foundation/Bus/PendingDispatch.php b/src/Illuminate/Foundation/Bus/PendingDispatch.php index b9172a68da2f..5ddf8f859d1e 100644 --- a/src/Illuminate/Foundation/Bus/PendingDispatch.php +++ b/src/Illuminate/Foundation/Bus/PendingDispatch.php @@ -2,16 +2,24 @@ namespace Illuminate\Foundation\Bus; +use Illuminate\Support\Collection; use Illuminate\Contracts\Bus\Dispatcher; class PendingDispatch { /** - * The job. + * The list of jobs pending dispatch. * - * @var mixed + * @var \Illuminate\Support\Collection */ - protected $job; + protected $jobs; + + /** + * The chain conductor instance. + * + * @var \Illuminate\Foundation\Bus\ChainConductor + */ + protected $chainConductor; /** * Create a new pending job dispatch. @@ -19,9 +27,10 @@ class PendingDispatch * @param mixed $job * @return void */ - public function __construct($job) + public function __construct($jobs, ChainConductor $chainConductor) { - $this->job = $job; + $this->jobs = Collection::wrap($jobs); + $this->chainConductor = $chainConductor; } /** @@ -32,7 +41,7 @@ public function __construct($job) */ public function onConnection($connection) { - $this->job->onConnection($connection); + $this->jobs->each->onConnection($connection); return $this; } @@ -45,7 +54,7 @@ public function onConnection($connection) */ public function onQueue($queue) { - $this->job->onQueue($queue); + $this->jobs->each->onQueue($queue); return $this; } @@ -58,22 +67,27 @@ public function onQueue($queue) */ public function delay($delay) { - $this->job->delay($delay); + $this->jobs->each->delay($delay); return $this; } /** - * Set the jobs that should run if this job is successful. + * Set up a chain of jobs. * - * @param array $chain - * @return $this + * @param object|array ...$jobs + * @return void */ - public function chain($chain) + public function chain(...$jobs) { - $this->job->chain($chain); + // Here we transform the chain into a multi-dimensional collection. + // The outer collection holds the links in the chain, the inner + // collections are the jobs that should be run concurrently. + $chain = Collection::make($jobs)->map(function ($jobs) { + return Collection::wrap($jobs); + })->prepend($this->jobs); - return $this; + $this->chainConductor->createChain($chain); } /** @@ -83,6 +97,10 @@ public function chain($chain) */ public function __destruct() { - app(Dispatcher::class)->dispatch($this->job); + $dispactcher = app(Dispatcher::class); + + foreach ($this->jobs as $job) { + $dispactcher->dispatch($job); + } } } diff --git a/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php b/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php index 9dfa833113fd..efb64bf7ec74 100755 --- a/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php +++ b/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php @@ -10,6 +10,7 @@ use Illuminate\Auth\Console\ClearResetsCommand; use Illuminate\Cache\Console\CacheTableCommand; use Illuminate\Foundation\Console\ServeCommand; +use Illuminate\Queue\Console\ChainTableCommand; use Illuminate\Foundation\Console\PresetCommand; use Illuminate\Queue\Console\FailedTableCommand; use Illuminate\Foundation\Console\AppNameCommand; @@ -143,6 +144,7 @@ class ArtisanServiceProvider extends ServiceProvider 'PolicyMake' => 'command.policy.make', 'ProviderMake' => 'command.provider.make', 'QueueFailedTable' => 'command.queue.failed-table', + 'QueueChainTable' => 'command.queue.chain-table', 'QueueTable' => 'command.queue.table', 'RequestMake' => 'command.request.make', 'RuleMake' => 'command.rule.make', @@ -607,6 +609,18 @@ protected function registerProviderMakeCommand() }); } + /** + * Register the command. + * + * @return void + */ + protected function registerQueueChainTableCommand() + { + $this->app->singleton('command.queue.chain-table', function ($app) { + return new ChainTableCommand($app['files'], $app['composer']); + }); + } + /** * Register the command. * diff --git a/src/Illuminate/Foundation/helpers.php b/src/Illuminate/Foundation/helpers.php index 3360a9e980df..14cc6c6a4108 100644 --- a/src/Illuminate/Foundation/helpers.php +++ b/src/Illuminate/Foundation/helpers.php @@ -4,6 +4,7 @@ use Illuminate\Container\Container; use Illuminate\Contracts\Bus\Dispatcher; use Illuminate\Contracts\Auth\Access\Gate; +use Illuminate\Foundation\Bus\ChainConductor; use Illuminate\Contracts\Routing\UrlGenerator; use Illuminate\Foundation\Bus\PendingDispatch; use Illuminate\Contracts\Routing\ResponseFactory; @@ -369,14 +370,14 @@ function decrypt($value) if (! function_exists('dispatch')) { /** - * Dispatch a job to its appropriate handler. + * Dispatch a jobs to their appropriate handler. * - * @param mixed $job + * @param array|object $jobs * @return \Illuminate\Foundation\Bus\PendingDispatch */ - function dispatch($job) + function dispatch($jobs) { - return new PendingDispatch($job); + return new PendingDispatch($jobs, app(ChainConductor::class)); } } diff --git a/src/Illuminate/Queue/CallQueuedHandler.php b/src/Illuminate/Queue/CallQueuedHandler.php index 6047a1a3e259..0d785a3b32d4 100644 --- a/src/Illuminate/Queue/CallQueuedHandler.php +++ b/src/Illuminate/Queue/CallQueuedHandler.php @@ -50,7 +50,11 @@ public function call(Job $job, array $data) ); if (! $job->hasFailed() && ! $job->isReleased()) { - $this->ensureNextJobInChainIsDispatched($command); + // This does not belong here, obviously. We should + // instead add it to some service provider, and + // do this from within a JobProccessed event. + app(\Illuminate\Foundation\Bus\ChainConductor::class) + ->jobCompleted($command); } if (! $job->isDeletedOrReleased()) { @@ -92,19 +96,6 @@ protected function setJobInstanceIfNecessary(Job $job, $instance) return $instance; } - /** - * Ensure the next job in the chain is dispatched if applicable. - * - * @param mixed $command - * @return void - */ - protected function ensureNextJobInChainIsDispatched($command) - { - if (method_exists($command, 'dispatchNextJobInChain')) { - $command->dispatchNextJobInChain(); - } - } - /** * Handle a model not found exception. * @@ -145,6 +136,12 @@ public function failed(array $data, $e) { $command = unserialize($data['command']); + // This does not belong here, obviously. We should + // instead add it to some service provider, and + // do this from within a JobFailed event. + app(\Illuminate\Foundation\Bus\ChainConductor::class) + ->jobFailed($command); + if (method_exists($command, 'failed')) { $command->failed($e); } diff --git a/src/Illuminate/Queue/Console/ChainTableCommand.php b/src/Illuminate/Queue/Console/ChainTableCommand.php new file mode 100644 index 000000000000..544e981521da --- /dev/null +++ b/src/Illuminate/Queue/Console/ChainTableCommand.php @@ -0,0 +1,102 @@ +files = $files; + $this->composer = $composer; + } + + /** + * Execute the console command. + * + * @return void + */ + public function fire() + { + $table = 'chained_jobs'; + + $this->replaceMigration( + $this->createBaseMigration($table), $table, Str::studly($table) + ); + + $this->info('Migration created successfully!'); + + $this->composer->dumpAutoloads(); + } + + /** + * Create a base migration file for the table. + * + * @param string $table + * @return string + */ + protected function createBaseMigration($table) + { + return $this->laravel['migration.creator']->create( + 'create_'.$table.'_table', $this->laravel->databasePath().'/migrations' + ); + } + + /** + * Replace the generated migration with the failed job table stub. + * + * @param string $path + * @param string $table + * @param string $tableClassName + * @return void + */ + protected function replaceMigration($path, $table, $tableClassName) + { + $stub = str_replace( + ['{{table}}', '{{tableClassName}}'], + [$table, $tableClassName], + $this->files->get(__DIR__.'/stubs/chained_jobs.stub') + ); + + $this->files->put($path, $stub); + } +} diff --git a/src/Illuminate/Queue/Console/stubs/chained_jobs.stub b/src/Illuminate/Queue/Console/stubs/chained_jobs.stub new file mode 100644 index 000000000000..a0977987c7bf --- /dev/null +++ b/src/Illuminate/Queue/Console/stubs/chained_jobs.stub @@ -0,0 +1,34 @@ +string('id', 36)->index(); + $table->string('chain_id', 36)->index(); + $table->string('reserve_key', 36)->nullable()->index(); + $table->text('job'); + $table->timestamp('created_at')->useCurrent(); + }); + } + + /** + * Reverse the migrations. + * + * @return void + */ + public function down() + { + Schema::dropIfExists('{{table}}'); + } +} diff --git a/src/Illuminate/Support/Collection.php b/src/Illuminate/Support/Collection.php index 8ebebf4678ca..61e9fe4dcb85 100644 --- a/src/Illuminate/Support/Collection.php +++ b/src/Illuminate/Support/Collection.php @@ -1239,6 +1239,21 @@ public function shift() return array_shift($this->items); } + /** + * Create overlapping chunks of the given size, by passing a "sliding window" over them. + * + * @param int $size + * @return static + */ + public function sliding($size) + { + $chunks = $this->count() - $size + 1; + + return static::times($chunks, function ($number) use ($size) { + return $this->slice($number - 1, $size); + }); + } + /** * Shuffle the items in the collection. *