diff --git a/src/Illuminate/Foundation/Bus/PendingDispatch.php b/src/Illuminate/Foundation/Bus/PendingDispatch.php index 1e514e5b0b78..4230515964c3 100644 --- a/src/Illuminate/Foundation/Bus/PendingDispatch.php +++ b/src/Illuminate/Foundation/Bus/PendingDispatch.php @@ -185,9 +185,11 @@ public function __call($method, $parameters) */ public function __destruct() { - if (! $this->shouldDispatch()) { - return; - } elseif ($this->afterResponse) { + if ($this->afterResponse) { + if (! $this->shouldDispatch()) { + return; + } + app(Dispatcher::class)->dispatchAfterResponse($this->job); } else { app(Dispatcher::class)->dispatch($this->job); diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index df7b8ec21cc5..9a6167cedf67 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -4,9 +4,12 @@ use Closure; use DateTimeInterface; +use Illuminate\Bus\UniqueLock; use Illuminate\Container\Container; +use Illuminate\Contracts\Cache\Repository; use Illuminate\Contracts\Encryption\Encrypter; use Illuminate\Contracts\Queue\ShouldBeEncrypted; +use Illuminate\Contracts\Queue\ShouldBeUnique; use Illuminate\Contracts\Queue\ShouldQueueAfterCommit; use Illuminate\Queue\Events\JobQueued; use Illuminate\Queue\Events\JobQueueing; @@ -328,15 +331,34 @@ protected function enqueueUsing($job, $payload, $queue, $delay, $callback) $this->container->bound('db.transactions')) { return $this->container->make('db.transactions')->addCallback( function () use ($queue, $job, $payload, $delay, $callback) { - $this->raiseJobQueueingEvent($queue, $job, $payload, $delay); + if (! $this->shouldDispatch($job)) { + return; + } - return tap($callback($payload, $queue, $delay), function ($jobId) use ($queue, $job, $payload, $delay) { - $this->raiseJobQueuedEvent($queue, $jobId, $job, $payload, $delay); - }); + return $this->enqueue($job, $payload, $queue, $delay, $callback); } ); } + if (! $this->shouldDispatch($job)) { + return; + } + + return $this->enqueue($job, $payload, $queue, $delay, $callback); + } + + /** + * Enqueue the job and dispatch queue events. + * + * @param \Closure|string|object $job + * @param string $payload + * @param string $queue + * @param \DateTimeInterface|\DateInterval|int|null $delay + * @param callable $callback + * @return mixed + */ + protected function enqueue($job, $payload, $queue, $delay, $callback) + { $this->raiseJobQueueingEvent($queue, $job, $payload, $delay); return tap($callback($payload, $queue, $delay), function ($jobId) use ($queue, $job, $payload, $delay) { @@ -344,6 +366,21 @@ function () use ($queue, $job, $payload, $delay, $callback) { }); } + /** + * Determine if the job should be dispatched. + * + * @param \Closure|string|object $job + */ + protected function shouldDispatch($job): bool + { + if (! $job instanceof ShouldBeUnique) { + return true; + } + + return (new UniqueLock($this->container->make(Repository::class))) + ->acquire($job); + } + /** * Determine if the job should be dispatched after all database transactions have committed. * diff --git a/tests/Integration/Queue/RedisQueueTest.php b/tests/Integration/Queue/RedisQueueTest.php index f7120110568d..e5b8812a962b 100644 --- a/tests/Integration/Queue/RedisQueueTest.php +++ b/tests/Integration/Queue/RedisQueueTest.php @@ -2,8 +2,15 @@ namespace Illuminate\Tests\Queue; +use Illuminate\Bus\Dispatcher as BusDispatcher; +use Illuminate\Bus\Queueable; use Illuminate\Container\Container; +use Illuminate\Contracts\Cache\Lock; +use Illuminate\Contracts\Cache\Repository; use Illuminate\Contracts\Events\Dispatcher; +use Illuminate\Contracts\Queue\ShouldBeUnique; +use Illuminate\Contracts\Queue\ShouldQueue; +use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Foundation\Testing\Concerns\InteractsWithRedis; use Illuminate\Queue\Events\JobQueued; use Illuminate\Queue\Events\JobQueueing; @@ -516,6 +523,60 @@ public function testBulkJobQueuedEvent($driver) new RedisQueueIntegrationTestJob(15), ]); } + + /** + * @param string $driver + */ + #[DataProvider('redisDriverProvider')] + public function testUniqueJobOnlyQueuesOnceOnRedisQueue($driver) + { + $lock = m::spy(Lock::class); + $lock->shouldReceive('get')->andReturn(true, false, false); + + $cache = m::spy(Repository::class); + $cache->shouldReceive('lock')->andReturn($lock); + + $container = m::spy(Container::class); + $container->shouldReceive('make')->with(Repository::class)->andReturn($cache)->times(3); + + $queue = new RedisQueue($this->redis[$driver]); + $queue->setContainer($container); + + $queue->push(new UniqueJob('uniqueId')); + $queue->push(new UniqueJob('uniqueId')); + $queue->push(new UniqueJob('uniqueId')); + + $this->assertEquals(1, $queue->size()); + } + + /** + * @param string $driver + */ + #[DataProvider('redisDriverProvider')] + public function testUniqueJobOnlyQueuesOnceBusDispatcher($driver) + { + $lock = m::spy(Lock::class); + $lock->shouldReceive('get')->andReturn(true, false, false); + + $cache = m::spy(Repository::class); + $cache->shouldReceive('lock')->andReturn($lock); + + $container = m::spy(Container::class); + $container->shouldReceive('make')->with(Repository::class)->andReturn($cache)->times(3); + + $queue = new RedisQueue($this->redis[$driver]); + $queue->setContainer($container); + + $dispatcher = new BusDispatcher($container, function () use ($queue) { + return $queue; + }); + + $dispatcher->dispatch(new UniqueJob('uniqueId')); + $dispatcher->dispatch(new UniqueJob('uniqueId')); + $dispatcher->dispatch(new UniqueJob('uniqueId')); + + $this->assertEquals(1, $queue->size()); + } } class RedisQueueIntegrationTestJob @@ -532,3 +593,13 @@ public function handle() // } } + +class UniqueJob extends RedisQueueIntegrationTestJob implements ShouldBeUnique, ShouldQueue +{ + use Dispatchable, Queueable; + + public function uniqueId() + { + return $this->i; + } +} diff --git a/tests/Integration/Queue/UniqueJobTest.php b/tests/Integration/Queue/UniqueJobTest.php index 36eb9aeb5cb7..d2396852dfec 100644 --- a/tests/Integration/Queue/UniqueJobTest.php +++ b/tests/Integration/Queue/UniqueJobTest.php @@ -10,7 +10,6 @@ use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; -use Illuminate\Support\Facades\Bus; use Orchestra\Testbench\Attributes\WithMigration; #[WithMigration] @@ -25,28 +24,6 @@ protected function defineEnvironment($app) $app['config']->set('cache.default', 'database'); } - public function testUniqueJobsAreNotDispatched() - { - Bus::fake(); - - UniqueTestJob::dispatch(); - $this->runQueueWorkerCommand(['--once' => true]); - Bus::assertDispatched(UniqueTestJob::class); - - $this->assertFalse( - $this->app->get(Cache::class)->lock($this->getLockKey(UniqueTestJob::class), 10)->get() - ); - - Bus::assertDispatchedTimes(UniqueTestJob::class); - UniqueTestJob::dispatch(); - $this->runQueueWorkerCommand(['--once' => true]); - Bus::assertDispatchedTimes(UniqueTestJob::class); - - $this->assertFalse( - $this->app->get(Cache::class)->lock($this->getLockKey(UniqueTestJob::class), 10)->get() - ); - } - public function testLockIsReleasedForSuccessfulJobs() { UniqueTestJob::$handled = false;