Skip to content

Commit

Permalink
Add JobQueued event
Browse files Browse the repository at this point in the history
(Follows: #32894)
  • Loading branch information
vdauchy authored and taylorotwell committed Jan 20, 2021
1 parent d7f27d5 commit 8eaec03
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 30 deletions.
29 changes: 29 additions & 0 deletions src/Illuminate/Queue/Events/JobQueued.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace Illuminate\Queue\Events;

class JobQueued
{
/**
* @var string|int|null
*/
public $jobId;

/**
* @var string|object
*/
public $job;

/**
* JobQueued constructor.
*
* @param string|int|null $jobId
* @param \Closure|string|object $job
* @return void
*/
public function __construct($jobId, $job)
{
$this->jobId = $jobId;
$this->job = $job;
}
}
25 changes: 22 additions & 3 deletions src/Illuminate/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Illuminate\Container\Container;
use Illuminate\Contracts\Encryption\Encrypter;
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Queue\Events\JobQueued;
use Illuminate\Support\Arr;
use Illuminate\Support\InteractsWithTime;
use Illuminate\Support\Str;
Expand Down Expand Up @@ -284,13 +285,17 @@ protected function enqueueUsing($job, $payload, $queue, $delay, $callback)
if ($this->shouldDispatchAfterCommit($job) &&
$this->container->bound('db.transactions')) {
return $this->container->make('db.transactions')->addCallback(
function () use ($payload, $queue, $delay, $callback) {
return $callback($payload, $queue, $delay);
function () use ($payload, $queue, $delay, $callback, $job) {
return tap($callback($payload, $queue, $delay), function ($jobId) use ($job) {
$this->raiseJobQueuedEvent($jobId, $job);
});
}
);
}

return $callback($payload, $queue, $delay);
return tap($callback($payload, $queue, $delay), function ($jobId) use ($job) {
$this->raiseJobQueuedEvent($jobId, $job);
});
}

/**
Expand Down Expand Up @@ -345,4 +350,18 @@ public function setContainer(Container $container)
{
$this->container = $container;
}

/**
* Raise the job queued event.
*
* @param string|int|null $jobId
* @param \Closure|string|object $job
* @return void
*/
protected function raiseJobQueuedEvent($jobId, $job)
{
if ($this->container->bound('events')) {
$this->container['events']->dispatch(new JobQueued($jobId, $job));
}
}
}
65 changes: 46 additions & 19 deletions tests/Queue/QueueBeanstalkdQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@

class QueueBeanstalkdQueueTest extends TestCase
{
/**
* @var BeanstalkdQueue
*/
private $queue;

/**
* @var Container|m\LegacyMockInterface|m\MockInterface
*/
private $container;

protected function tearDown(): void
{
m::close();
Expand All @@ -26,14 +36,16 @@ public function testPushProperlyPushesJobOntoBeanstalkd()
return $uuid;
});

$queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), 'default', 60);
$pheanstalk = $queue->getPheanstalk();
$this->setQueue('default', 60);
$pheanstalk = $this->queue->getPheanstalk();
$pheanstalk->shouldReceive('useTube')->once()->with('stack')->andReturn($pheanstalk);
$pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk);
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data']]), 1024, 0, 60);

$queue->push('foo', ['data'], 'stack');
$queue->push('foo', ['data']);
$this->queue->push('foo', ['data'], 'stack');
$this->queue->push('foo', ['data']);

$this->container->shouldHaveReceived('bound')->with('events')->times(2);

Str::createUuidsNormally();
}
Expand All @@ -46,53 +58,68 @@ public function testDelayedPushProperlyPushesJobOntoBeanstalkd()
return $uuid;
});

$queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), 'default', 60);
$pheanstalk = $queue->getPheanstalk();
$this->setQueue('default', 60);
$pheanstalk = $this->queue->getPheanstalk();
$pheanstalk->shouldReceive('useTube')->once()->with('stack')->andReturn($pheanstalk);
$pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk);
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data']]), Pheanstalk::DEFAULT_PRIORITY, 5, Pheanstalk::DEFAULT_TTR);

$queue->later(5, 'foo', ['data'], 'stack');
$queue->later(5, 'foo', ['data']);
$this->queue->later(5, 'foo', ['data'], 'stack');
$this->queue->later(5, 'foo', ['data']);

$this->container->shouldHaveReceived('bound')->with('events')->times(2);

Str::createUuidsNormally();
}

public function testPopProperlyPopsJobOffOfBeanstalkd()
{
$queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), 'default', 60);
$queue->setContainer(m::mock(Container::class));
$pheanstalk = $queue->getPheanstalk();
$this->setQueue('default', 60);

$pheanstalk = $this->queue->getPheanstalk();
$pheanstalk->shouldReceive('watchOnly')->once()->with('default')->andReturn($pheanstalk);
$job = m::mock(Job::class);
$pheanstalk->shouldReceive('reserveWithTimeout')->once()->with(0)->andReturn($job);

$result = $queue->pop();
$result = $this->queue->pop();

$this->assertInstanceOf(BeanstalkdJob::class, $result);
}

public function testBlockingPopProperlyPopsJobOffOfBeanstalkd()
{
$queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), 'default', 60, 60);
$queue->setContainer(m::mock(Container::class));
$pheanstalk = $queue->getPheanstalk();
$this->setQueue('default', 60, 60);

$pheanstalk = $this->queue->getPheanstalk();
$pheanstalk->shouldReceive('watchOnly')->once()->with('default')->andReturn($pheanstalk);
$job = m::mock(Job::class);
$pheanstalk->shouldReceive('reserveWithTimeout')->once()->with(60)->andReturn($job);

$result = $queue->pop();
$result = $this->queue->pop();

$this->assertInstanceOf(BeanstalkdJob::class, $result);
}

public function testDeleteProperlyRemoveJobsOffBeanstalkd()
{
$queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), 'default', 60);
$pheanstalk = $queue->getPheanstalk();
$this->setQueue('default', 60);

$pheanstalk = $this->queue->getPheanstalk();
$pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk);
$pheanstalk->shouldReceive('delete')->once()->with(m::type(Job::class));

$queue->deleteMessage('default', 1);
$this->queue->deleteMessage('default', 1);
}

/**
* @param string $default
* @param int $timeToRun
* @param int $blockFor
*/
private function setQueue($default, $timeToRun, $blockFor = 0)
{
$this->queue = new BeanstalkdQueue(m::mock(Pheanstalk::class), $default, $timeToRun, $blockFor);
$this->container = m::spy(Container::class);
$this->queue->setContainer($this->container);
}
}
7 changes: 7 additions & 0 deletions tests/Queue/QueueDatabaseQueueUnitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Illuminate\Tests\Queue;

use Illuminate\Container\Container;
use Illuminate\Database\Connection;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Queue\Queue;
Expand All @@ -28,6 +29,7 @@ public function testPushProperlyPushesJobOntoDatabase()

$queue = $this->getMockBuilder(DatabaseQueue::class)->onlyMethods(['currentTime'])->setConstructorArgs([$database = m::mock(Connection::class), 'table', 'default'])->getMock();
$queue->expects($this->any())->method('currentTime')->willReturn('time');
$queue->setContainer($container = m::spy(Container::class));
$database->shouldReceive('table')->with('table')->andReturn($query = m::mock(stdClass::class));
$query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) use ($uuid) {
$this->assertSame('default', $array['queue']);
Expand All @@ -39,6 +41,8 @@ public function testPushProperlyPushesJobOntoDatabase()

$queue->push('foo', ['data']);

$container->shouldHaveReceived('bound')->with('events')->once();

Str::createUuidsNormally();
}

Expand All @@ -56,6 +60,7 @@ public function testDelayedPushProperlyPushesJobOntoDatabase()
[$database = m::mock(Connection::class), 'table', 'default']
)->getMock();
$queue->expects($this->any())->method('currentTime')->willReturn('time');
$queue->setContainer($container = m::spy(Container::class));
$database->shouldReceive('table')->with('table')->andReturn($query = m::mock(stdClass::class));
$query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) use ($uuid) {
$this->assertSame('default', $array['queue']);
Expand All @@ -67,6 +72,8 @@ public function testDelayedPushProperlyPushesJobOntoDatabase()

$queue->later(10, 'foo', ['data']);

$container->shouldHaveReceived('bound')->with('events')->once();

Str::createUuidsNormally();
}

Expand Down
11 changes: 11 additions & 0 deletions tests/Queue/QueueRedisQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Illuminate\Tests\Queue;

use Illuminate\Container\Container;
use Illuminate\Contracts\Redis\Factory;
use Illuminate\Queue\LuaScripts;
use Illuminate\Queue\Queue;
Expand All @@ -28,11 +29,13 @@ public function testPushProperlyPushesJobOntoRedis()

$queue = $this->getMockBuilder(RedisQueue::class)->onlyMethods(['getRandomId'])->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])->getMock();
$queue->expects($this->once())->method('getRandomId')->willReturn('foo');
$queue->setContainer($container = m::spy(Container::class));
$redis->shouldReceive('connection')->once()->andReturn($redis);
$redis->shouldReceive('eval')->once()->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'id' => 'foo', 'attempts' => 0]));

$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
$container->shouldHaveReceived('bound')->with('events')->once();

Str::createUuidsNormally();
}
Expand All @@ -47,6 +50,7 @@ public function testPushProperlyPushesJobOntoRedisWithCustomPayloadHook()

$queue = $this->getMockBuilder(RedisQueue::class)->onlyMethods(['getRandomId'])->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])->getMock();
$queue->expects($this->once())->method('getRandomId')->willReturn('foo');
$queue->setContainer($container = m::spy(Container::class));
$redis->shouldReceive('connection')->once()->andReturn($redis);
$redis->shouldReceive('eval')->once()->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'custom' => 'taylor', 'id' => 'foo', 'attempts' => 0]));

Expand All @@ -56,6 +60,7 @@ public function testPushProperlyPushesJobOntoRedisWithCustomPayloadHook()

$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
$container->shouldHaveReceived('bound')->with('events')->once();

Queue::createPayloadUsing(null);

Expand All @@ -72,6 +77,7 @@ public function testPushProperlyPushesJobOntoRedisWithTwoCustomPayloadHook()

$queue = $this->getMockBuilder(RedisQueue::class)->onlyMethods(['getRandomId'])->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])->getMock();
$queue->expects($this->once())->method('getRandomId')->willReturn('foo');
$queue->setContainer($container = m::spy(Container::class));
$redis->shouldReceive('connection')->once()->andReturn($redis);
$redis->shouldReceive('eval')->once()->with(LuaScripts::push(), 2, 'queues:default', 'queues:default:notify', json_encode(['uuid' => $uuid, 'displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'maxExceptions' => null, 'backoff' => null, 'timeout' => null, 'data' => ['data'], 'custom' => 'taylor', 'bar' => 'foo', 'id' => 'foo', 'attempts' => 0]));

Expand All @@ -85,6 +91,7 @@ public function testPushProperlyPushesJobOntoRedisWithTwoCustomPayloadHook()

$id = $queue->push('foo', ['data']);
$this->assertSame('foo', $id);
$container->shouldHaveReceived('bound')->with('events')->once();

Queue::createPayloadUsing(null);

Expand All @@ -100,6 +107,7 @@ public function testDelayedPushProperlyPushesJobOntoRedis()
});

$queue = $this->getMockBuilder(RedisQueue::class)->onlyMethods(['availableAt', 'getRandomId'])->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])->getMock();
$queue->setContainer($container = m::spy(Container::class));
$queue->expects($this->once())->method('getRandomId')->willReturn('foo');
$queue->expects($this->once())->method('availableAt')->with(1)->willReturn(2);

Expand All @@ -112,6 +120,7 @@ public function testDelayedPushProperlyPushesJobOntoRedis()

$id = $queue->later(1, 'foo', ['data']);
$this->assertSame('foo', $id);
$container->shouldHaveReceived('bound')->with('events')->once();

Str::createUuidsNormally();
}
Expand All @@ -126,6 +135,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()

$date = Carbon::now();
$queue = $this->getMockBuilder(RedisQueue::class)->onlyMethods(['availableAt', 'getRandomId'])->setConstructorArgs([$redis = m::mock(Factory::class), 'default'])->getMock();
$queue->setContainer($container = m::spy(Container::class));
$queue->expects($this->once())->method('getRandomId')->willReturn('foo');
$queue->expects($this->once())->method('availableAt')->with($date)->willReturn(2);

Expand All @@ -137,6 +147,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()
);

$queue->later($date, 'foo', ['data']);
$container->shouldHaveReceived('bound')->with('events')->once();

Str::createUuidsNormally();
}
Expand Down
6 changes: 6 additions & 0 deletions tests/Queue/QueueSqsQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,33 +92,39 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoSqs()
{
$now = Carbon::now();
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->setContainer($container = m::spy(Container::class));
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->willReturn($this->mockedPayload);
$queue->expects($this->once())->method('secondsUntil')->with($now)->willReturn(5);
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->willReturn($this->queueUrl);
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => 5])->andReturn($this->mockedSendMessageResponseModel);
$id = $queue->later($now->addSeconds(5), $this->mockedJob, $this->mockedData, $this->queueName);
$this->assertEquals($this->mockedMessageId, $id);
$container->shouldHaveReceived('bound')->with('events')->once();
}

public function testDelayedPushProperlyPushesJobOntoSqs()
{
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->setContainer($container = m::spy(Container::class));
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->willReturn($this->mockedPayload);
$queue->expects($this->once())->method('secondsUntil')->with($this->mockedDelay)->willReturn($this->mockedDelay);
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->willReturn($this->queueUrl);
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => $this->mockedDelay])->andReturn($this->mockedSendMessageResponseModel);
$id = $queue->later($this->mockedDelay, $this->mockedJob, $this->mockedData, $this->queueName);
$this->assertEquals($this->mockedMessageId, $id);
$container->shouldHaveReceived('bound')->with('events')->once();
}

public function testPushProperlyPushesJobOntoSqs()
{
$queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->setContainer($container = m::spy(Container::class));
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->willReturn($this->mockedPayload);
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->willReturn($this->queueUrl);
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload])->andReturn($this->mockedSendMessageResponseModel);
$id = $queue->push($this->mockedJob, $this->mockedData, $this->queueName);
$this->assertEquals($this->mockedMessageId, $id);
$container->shouldHaveReceived('bound')->with('events')->once();
}

public function testSizeProperlyReadsSqsQueueSize()
Expand Down
Loading

0 comments on commit 8eaec03

Please sign in to comment.