diff --git a/src/Queue.php b/src/Queue.php index 30a130b..175d89f 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -12,14 +12,18 @@ namespace think; use InvalidArgumentException; +use think\cache\driver\Redis; use think\helper\Str; use think\queue\Connector; +use think\queue\connector\Database; /** * Class Queue * @package think\queue * * @method Connector driver($driver = null) + * @mixin Database + * @mixin Redis */ class Queue extends Factory { @@ -33,7 +37,7 @@ class Queue extends Factory */ protected function getConfig($name) { - return $this->app->config->get("queue.connectors.{$name}", ['driver' => 'sync']); + return $this->app->config->get("queue.connections.{$name}", ['driver' => 'sync']); } protected function createDriver($name) @@ -47,18 +51,27 @@ protected function createDriver($name) $driver = $this->app->invokeClass($class, [$this->getConfig($driver)]); return $driver->setApp($this->app) - ->setConnectorName($name); + ->setConnection($name); } throw new InvalidArgumentException("Driver [$driver] not supported."); } + /** + * @param null|string $name + * @return Connector + */ + public function connection($name = null) + { + return $this->driver($name); + } + /** * 默认驱动 * @return string */ public function getDefaultDriver() { - return $this->app->config->get('queue.default', 'sync'); + return $this->app->config->get('queue.default'); } } diff --git a/src/config.php b/src/config.php index 2122a31..e2f25c8 100644 --- a/src/config.php +++ b/src/config.php @@ -10,8 +10,8 @@ // +---------------------------------------------------------------------- return [ - 'default' => 'sync', - 'connectors' => [ + 'default' => 'sync', + 'connections' => [ 'sync' => [ 'driver' => 'sync', ], @@ -31,7 +31,7 @@ 'persistent' => false, ], ], - 'failed' => [ + 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], diff --git a/src/queue/Connector.php b/src/queue/Connector.php index 7fedd74..618500e 100644 --- a/src/queue/Connector.php +++ b/src/queue/Connector.php @@ -25,7 +25,7 @@ abstract class Connector * * @var string */ - protected $connectorName; + protected $connection; protected $options = []; @@ -143,9 +143,9 @@ public function setApp(App $app) * * @return string */ - public function getConnectorName() + public function getConnection() { - return $this->connectorName; + return $this->connection; } /** @@ -154,9 +154,9 @@ public function getConnectorName() * @param string $name * @return $this */ - public function setConnectorName($name) + public function setConnection($name) { - $this->connectorName = $name; + $this->connection = $name; return $this; } diff --git a/src/queue/Job.php b/src/queue/Job.php index 40e37eb..27357f3 100644 --- a/src/queue/Job.php +++ b/src/queue/Job.php @@ -37,7 +37,7 @@ abstract class Job /** * The name of the connection the job belongs to. */ - protected $connector; + protected $connection; /** * Indicates if the job has been deleted. @@ -266,9 +266,9 @@ public function getName() * * @return string */ - public function getConnector() + public function getConnection() { - return $this->connector; + return $this->connection; } /** diff --git a/src/queue/Listener.php b/src/queue/Listener.php index 4731791..632510d 100644 --- a/src/queue/Listener.php +++ b/src/queue/Listener.php @@ -58,7 +58,7 @@ protected function phpBinary() } /** - * @param string $connector + * @param string $connection * @param string $queue * @param int $delay * @param int $sleep @@ -67,9 +67,9 @@ protected function phpBinary() * @param int $timeout * @return void */ - public function listen($connector, $queue, $delay = 0, $sleep = 3, $maxTries = 0, $memory = 128, $timeout = 60) + public function listen($connection, $queue, $delay = 0, $sleep = 3, $maxTries = 0, $memory = 128, $timeout = 60) { - $process = $this->makeProcess($connector, $queue, $delay, $sleep, $maxTries, $memory, $timeout); + $process = $this->makeProcess($connection, $queue, $delay, $sleep, $maxTries, $memory, $timeout); while (true) { $this->runProcess($process, $memory); @@ -77,7 +77,7 @@ public function listen($connector, $queue, $delay = 0, $sleep = 3, $maxTries = 0 } /** - * @param string $connector + * @param string $connection * @param string $queue * @param int $delay * @param int $sleep @@ -86,13 +86,13 @@ public function listen($connector, $queue, $delay = 0, $sleep = 3, $maxTries = 0 * @param int $timeout * @return Process */ - public function makeProcess($connector, $queue, $delay, $sleep, $maxTries, $memory, $timeout) + public function makeProcess($connection, $queue, $delay, $sleep, $maxTries, $memory, $timeout) { $command = array_filter([ $this->phpBinary(), 'think', 'queue:work', - $connector, + $connection, '--once', "--queue={$queue}", "--delay={$delay}", diff --git a/src/queue/Worker.php b/src/queue/Worker.php index 015f11e..f98079f 100644 --- a/src/queue/Worker.php +++ b/src/queue/Worker.php @@ -11,6 +11,7 @@ namespace think\queue; +use Carbon\Carbon; use Exception; use RuntimeException; use think\Cache; @@ -22,6 +23,7 @@ use think\queue\event\JobProcessed; use think\queue\event\JobProcessing; use think\queue\event\WorkerStopping; +use think\queue\exception\MaxAttemptsExceededException; use Throwable; class Worker @@ -59,7 +61,7 @@ public function __construct(Queue $queue, Event $event, Handle $handle, Cache $c } /** - * @param string $connector + * @param string $connection * @param string $queue * @param int $delay * @param int $sleep @@ -67,7 +69,7 @@ public function __construct(Queue $queue, Event $event, Handle $handle, Cache $c * @param int $memory * @param int $timeout */ - public function daemon($connector, $queue, $delay = 0, $sleep = 3, $maxTries = 0, $memory = 128, $timeout = 60) + public function daemon($connection, $queue, $delay = 0, $sleep = 3, $maxTries = 0, $memory = 128, $timeout = 60) { if ($this->supportsAsyncSignals()) { $this->listenForSignals(); @@ -78,7 +80,7 @@ public function daemon($connector, $queue, $delay = 0, $sleep = 3, $maxTries = 0 while (true) { $job = $this->getNextJob( - $this->queue->driver($connector), $queue + $this->queue->connection($connection), $queue ); if ($this->supportsAsyncSignals()) { @@ -86,7 +88,7 @@ public function daemon($connector, $queue, $delay = 0, $sleep = 3, $maxTries = 0 } if ($job) { - $this->runJob($job, $connector, $maxTries, $delay); + $this->runJob($job, $connection, $maxTries, $delay); } else { $this->sleep($sleep); } @@ -231,7 +233,7 @@ protected function listenForSignals() /** * 执行下个任务 - * @param string $connectorName + * @param string $connection * @param string $queue * @param int $delay * @param int $sleep @@ -239,31 +241,30 @@ protected function listenForSignals() * @return void * @throws Exception */ - public function runNextJob($connectorName, $queue, $delay = 0, $sleep = 3, $maxTries = 0) + public function runNextJob($connection, $queue, $delay = 0, $sleep = 3, $maxTries = 0) { - $job = $this->getNextJob($this->queue->driver($connectorName), $queue); + $job = $this->getNextJob($this->queue->connection($connection), $queue); if ($job) { - $this->runJob($job, $connectorName, $maxTries, $delay); - return; + $this->runJob($job, $connection, $maxTries, $delay); + } else { + $this->sleep($sleep); } - - $this->sleep($sleep); } /** * 执行任务 * @param Job $job - * @param string $connectorName + * @param string $connection * @param int $maxTries * @param int $delay * @return void */ - protected function runJob($job, $connectorName, $maxTries, $delay) + protected function runJob($job, $connection, $maxTries, $delay) { try { - $this->process($connectorName, $job, $maxTries, $delay); + $this->process($connection, $job, $maxTries, $delay); } catch (Exception | Throwable $e) { $this->handle->report($e); } @@ -291,32 +292,32 @@ protected function getNextJob($connector, $queue) /** * Process a given job from the queue. - * @param string $connector + * @param string $connection * @param Job $job * @param int $maxTries * @param int $delay * @return void * @throws Exception */ - public function process($connector, $job, $maxTries = 0, $delay = 0) + public function process($connection, $job, $maxTries = 0, $delay = 0) { try { - $this->event->trigger(new JobProcessing($connector, $job)); + $this->event->trigger(new JobProcessing($connection, $job)); $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( - $connector, $job, (int) $maxTries + $connection, $job, (int) $maxTries ); $job->fire(); - $this->event->trigger(new JobProcessed($connector, $job)); + $this->event->trigger(new JobProcessed($connection, $job)); } catch (Exception | Throwable $e) { try { if (!$job->hasFailed()) { - $this->markJobAsFailedIfWillExceedMaxAttempts($connector, $job, (int) $maxTries, $e); + $this->markJobAsFailedIfWillExceedMaxAttempts($connection, $job, (int) $maxTries, $e); } - $this->event->trigger(new JobExceptionOccurred($connector, $job, $e)); + $this->event->trigger(new JobExceptionOccurred($connection, $job, $e)); } finally { if (!$job->isDeleted() && !$job->isReleased() && !$job->hasFailed()) { $job->release($delay); @@ -328,17 +329,17 @@ public function process($connector, $job, $maxTries = 0, $delay = 0) } /** - * @param string $connector + * @param string $connection * @param Job $job * @param int $maxTries */ - protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connector, $job, $maxTries) + protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connection, $job, $maxTries) { $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; $timeoutAt = $job->timeoutAt(); - if ($timeoutAt && time() <= $timeoutAt) { + if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) { return; } @@ -346,7 +347,7 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connector, $job, return; } - $this->failJob($connector, $job, $e = new RuntimeException( + $this->failJob($connection, $job, $e = new MaxAttemptsExceededException( $job->getName() . ' has been attempted too many times or run too long. The job may have previously timed out.' )); @@ -354,30 +355,30 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connector, $job, } /** - * @param string $connector + * @param string $connection * @param Job $job * @param int $maxTries * @param Exception $e */ - protected function markJobAsFailedIfWillExceedMaxAttempts($connector, $job, $maxTries, $e) + protected function markJobAsFailedIfWillExceedMaxAttempts($connection, $job, $maxTries, $e) { $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; - if ($job->timeoutAt() && $job->timeoutAt() <= time()) { - $this->failJob($connector, $job, $e); + if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) { + $this->failJob($connection, $job, $e); } if ($maxTries > 0 && $job->attempts() >= $maxTries) { - $this->failJob($connector, $job, $e); + $this->failJob($connection, $job, $e); } } /** - * @param string $connector + * @param string $connection * @param Job $job * @param Exception $e */ - protected function failJob($connector, $job, $e) + protected function failJob($connection, $job, $e) { $job->markAsFailed(); @@ -391,7 +392,7 @@ protected function failJob($connector, $job, $e) $job->failed($e); } finally { $this->event->trigger(new JobFailed( - $connector, $job, $e ?: new RuntimeException('ManuallyFailed') + $connection, $job, $e ?: new RuntimeException('ManuallyFailed') )); } } diff --git a/src/queue/command/Listen.php b/src/queue/command/Listen.php index 4c16135..61e47c1 100644 --- a/src/queue/command/Listen.php +++ b/src/queue/command/Listen.php @@ -35,7 +35,7 @@ public function __construct(Listener $listener) protected function configure() { $this->setName('queue:listen') - ->addArgument('connector', Argument::OPTIONAL, 'The name of the queue connector to work', null) + ->addArgument('connection', Argument::OPTIONAL, 'The name of the queue connection to work', null) ->addOption('queue', null, Option::VALUE_OPTIONAL, 'The queue to listen on', null) ->addOption('delay', null, Option::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0) ->addOption('memory', null, Option::VALUE_OPTIONAL, 'The memory limit in megabytes', 128) @@ -47,15 +47,15 @@ protected function configure() public function execute(Input $input, Output $output) { - $connector = $input->getArgument('connector') ?: $this->app->config->get('queue.connector', 'sync'); + $connection = $input->getArgument('connection') ?: $this->app->config->get('queue.default'); - $queue = $input->getOption('queue') ?: $this->app->config->get("queue.{$connector}", 'default'); + $queue = $input->getOption('queue') ?: $this->app->config->get("queue.connections.{$connection}.queue", 'default'); $delay = $input->getOption('delay'); $memory = $input->getOption('memory'); $timeout = $input->getOption('timeout'); $sleep = $input->getOption('sleep'); $tries = $input->getOption('tries'); - $this->listener->listen($connector, $queue, $delay, $sleep, $tries, $memory, $timeout); + $this->listener->listen($connection, $queue, $delay, $sleep, $tries, $memory, $timeout); } } diff --git a/src/queue/command/Restart.php b/src/queue/command/Restart.php index dd6eae5..f0d3b4d 100644 --- a/src/queue/command/Restart.php +++ b/src/queue/command/Restart.php @@ -13,9 +13,12 @@ use think\Cache; use think\console\Command; +use think\queue\InteractsWithTime; class Restart extends Command { + use InteractsWithTime; + protected function configure() { $this->setName('queue:restart') @@ -24,7 +27,7 @@ protected function configure() public function handle(Cache $cache) { - $cache->set('think:queue:restart', time()); + $cache->set('think:queue:restart', $this->currentTime()); $this->output->info("Broadcasting queue restart signal."); } } diff --git a/src/queue/command/Retry.php b/src/queue/command/Retry.php index 121c1c2..7551a37 100644 --- a/src/queue/command/Retry.php +++ b/src/queue/command/Retry.php @@ -41,8 +41,8 @@ public function handle() */ protected function retryJob($job) { - $this->app['queue']->driver($job->connector)->pushRaw( - $this->resetAttempts($job->payload), $job->queue + $this->app['queue']->connection($job['connection'])->pushRaw( + $this->resetAttempts($job['payload']), $job['queue'] ); } diff --git a/src/queue/command/Table.php b/src/queue/command/Table.php index 815cc48..9354dc7 100644 --- a/src/queue/command/Table.php +++ b/src/queue/command/Table.php @@ -21,7 +21,7 @@ public function handle() return; } - $table = $this->app->config->get('queue.connectors.database.table'); + $table = $this->app->config->get('queue.connections.database.table'); $className = Str::studly("create_{$table}_table"); diff --git a/src/queue/command/Work.php b/src/queue/command/Work.php index aa4dda1..5ead2d4 100644 --- a/src/queue/command/Work.php +++ b/src/queue/command/Work.php @@ -39,7 +39,7 @@ public function __construct(Worker $worker) protected function configure() { $this->setName('queue:work') - ->addArgument('connector', Argument::OPTIONAL, 'The name of the queue connector to work', null) + ->addArgument('connection', Argument::OPTIONAL, 'The name of the queue connection to work', null) ->addOption('queue', null, Option::VALUE_OPTIONAL, 'The queue to listen on') ->addOption('once', null, Option::VALUE_NONE, 'Only process the next job on the queue') ->addOption('delay', null, Option::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0) @@ -59,9 +59,9 @@ protected function configure() */ public function execute(Input $input, Output $output) { - $connector = $input->getArgument('connector') ?: $this->app->config->get('queue.connector', 'sync'); + $connection = $input->getArgument('connection') ?: $this->app->config->get('queue.default'); - $queue = $input->getOption('queue') ?: $this->app->config->get("queue.{$connector}", 'default'); + $queue = $input->getOption('queue') ?: $this->app->config->get("queue.connections.{$connection}.queue", 'default'); $delay = $input->getOption('delay'); $sleep = $input->getOption('sleep'); $tries = $input->getOption('tries'); @@ -69,11 +69,11 @@ public function execute(Input $input, Output $output) $this->listenForEvents(); if ($input->getOption('once')) { - $this->worker->runNextJob($connector, $queue, $delay, $sleep, $tries); + $this->worker->runNextJob($connection, $queue, $delay, $sleep, $tries); } else { $memory = $input->getOption('memory'); $timeout = $input->getOption('timeout'); - $this->worker->daemon($connector, $queue, $delay, $sleep, $tries, $memory, $timeout); + $this->worker->daemon($connection, $queue, $delay, $sleep, $tries, $memory, $timeout); } } @@ -82,15 +82,15 @@ public function execute(Input $input, Output $output) */ protected function listenForEvents() { - $this->app->event->listen(JobProcessing::class, function ($event) { + $this->app->event->listen(JobProcessing::class, function (JobProcessing $event) { $this->writeOutput($event->job, 'starting'); }); - $this->app->event->listen(JobProcessed::class, function ($event) { + $this->app->event->listen(JobProcessed::class, function (JobProcessed $event) { $this->writeOutput($event->job, 'success'); }); - $this->app->event->listen(JobFailed::class, function ($event) { + $this->app->event->listen(JobFailed::class, function (JobFailed $event) { $this->writeOutput($event->job, 'failed'); $this->logFailedJob($event); @@ -143,7 +143,7 @@ protected function writeStatus(Job $job, $status, $type) protected function logFailedJob(JobFailed $event) { $this->app['queue.failer']->log( - $event->connector, $event->job->getQueue(), + $event->connection, $event->job->getQueue(), $event->job->getRawBody(), $event->exception ); } diff --git a/src/queue/command/stubs/jobs.stub b/src/queue/command/stubs/jobs.stub index 6691c90..937d549 100644 --- a/src/queue/command/stubs/jobs.stub +++ b/src/queue/command/stubs/jobs.stub @@ -13,7 +13,7 @@ class CreateJobsTable extends Migrator ->addColumn(Column::tinyInteger('attempts')->setUnsigned()) ->addColumn(Column::unsignedInteger('reserved_at')->setNullable()) ->addColumn(Column::unsignedInteger('available_at')) - ->addColumn(Column::unsignedInteger('create_at')) + ->addColumn(Column::unsignedInteger('created_at')) ->addIndex('queue') ->create(); } diff --git a/src/queue/connector/Database.php b/src/queue/connector/Database.php index 6adc3e7..ba4f1da 100644 --- a/src/queue/connector/Database.php +++ b/src/queue/connector/Database.php @@ -82,6 +82,26 @@ public function later($delay, $job, $data = '', $queue = null) return $this->pushToDatabase($queue, $this->createPayload($job, $data), $delay); } + public function bulk($jobs, $data = '', $queue = null) + { + $queue = $this->getQueue($queue); + + $availableAt = $this->availableAt(); + + return $this->db->name($this->table)->insertAll(collect((array) $jobs)->map( + function ($job) use ($queue, $data, $availableAt) { + return [ + 'queue' => $queue, + 'attempts' => 0, + 'reserved_at' => null, + 'available_at' => $availableAt, + 'created_at' => $this->currentTime(), + 'payload' => $this->createPayload($job, $data), + ]; + } + )->all()); + } + /** * 重新发布任务 * @@ -126,10 +146,8 @@ public function pop($queue = null) $job = $this->markJobAsReserved($job); - return new DatabaseJob($this->app, $this, $job, $this->connectorName, $queue); + return new DatabaseJob($this->app, $this, $job, $this->connection, $queue); } - - return; }); } diff --git a/src/queue/connector/Redis.php b/src/queue/connector/Redis.php index 66812c5..063565a 100644 --- a/src/queue/connector/Redis.php +++ b/src/queue/connector/Redis.php @@ -120,7 +120,7 @@ public function pop($queue = null) [$job, $reserved] = $nextJob; if ($reserved) { - return new RedisJob($this->app, $this, $job, $reserved, $this->connectorName, $queue); + return new RedisJob($this->app, $this, $job, $reserved, $this->connection, $queue); } } diff --git a/src/queue/connector/Sync.php b/src/queue/connector/Sync.php index 4f1265c..3f4d420 100644 --- a/src/queue/connector/Sync.php +++ b/src/queue/connector/Sync.php @@ -32,14 +32,14 @@ public function push($job, $data = '', $queue = null) $queueJob = $this->resolveJob($this->createPayload($job, $data), $queue); try { - $this->triggerEvent(new JobProcessing($this->connectorName, $job)); + $this->triggerEvent(new JobProcessing($this->connection, $job)); $queueJob->fire(); - $this->triggerEvent(new JobProcessed($this->connectorName, $job)); + $this->triggerEvent(new JobProcessed($this->connection, $job)); } catch (Exception | Throwable $e) { - $this->triggerEvent(new JobFailed($this->connectorName, $job, $e)); + $this->triggerEvent(new JobFailed($this->connection, $job, $e)); throw $e; } @@ -59,7 +59,7 @@ public function pop($queue = null) protected function resolveJob($payload, $queue) { - return new SyncJob($this->app, $payload, $this->connectorName, $queue); + return new SyncJob($this->app, $payload, $this->connection, $queue); } public function pushRaw($payload, $queue = null, array $options = []) diff --git a/src/queue/event/JobFailed.php b/src/queue/event/JobFailed.php index 30b9369..9e3706a 100644 --- a/src/queue/event/JobFailed.php +++ b/src/queue/event/JobFailed.php @@ -7,7 +7,7 @@ class JobFailed { /** @var string */ - public $connector; + public $connection; /** @var Job */ public $job; @@ -15,10 +15,10 @@ class JobFailed /** @var \Exception */ public $exception; - public function __construct($connector, $job, $exception) + public function __construct($connection, $job, $exception) { - $this->connector = $connector; - $this->job = $job; - $this->exception = $exception; + $this->connection = $connection; + $this->job = $job; + $this->exception = $exception; } } diff --git a/src/queue/event/JobProcessed.php b/src/queue/event/JobProcessed.php index 6c9c799..9ae9fca 100644 --- a/src/queue/event/JobProcessed.php +++ b/src/queue/event/JobProcessed.php @@ -7,14 +7,14 @@ class JobProcessed { /** @var string */ - public $connector; + public $connection; /** @var Job */ public $job; - public function __construct($connector, $job) + public function __construct($connection, $job) { - $this->connector = $connector; - $this->job = $job; + $this->connection = $connection; + $this->job = $job; } } diff --git a/src/queue/event/JobProcessing.php b/src/queue/event/JobProcessing.php index 87a03c3..72ff3c5 100644 --- a/src/queue/event/JobProcessing.php +++ b/src/queue/event/JobProcessing.php @@ -7,14 +7,14 @@ class JobProcessing { /** @var string */ - public $connector; + public $connection; /** @var Job */ public $job; - public function __construct($connector, $job) + public function __construct($connection, $job) { - $this->connector = $connector; - $this->job = $job; + $this->connection = $connection; + $this->job = $job; } } diff --git a/src/queue/exception/MaxAttemptsExceededException.php b/src/queue/exception/MaxAttemptsExceededException.php new file mode 100644 index 0000000..fb8440b --- /dev/null +++ b/src/queue/exception/MaxAttemptsExceededException.php @@ -0,0 +1,10 @@ +getTable()->insertGetId(compact( - 'connector', 'queue', 'payload', 'exception', 'failed_at' + 'connection', 'queue', 'payload', 'exception', 'failed_at' )); } @@ -57,7 +57,7 @@ public function log($connector, $queue, $payload, $exception) */ public function all() { - return $this->getTable()->order('id', 'desc')->select()->all(); + return collect($this->getTable()->order('id', 'desc')->select())->all(); } /** diff --git a/src/queue/job/Database.php b/src/queue/job/Database.php index 4be7f30..bef756a 100644 --- a/src/queue/job/Database.php +++ b/src/queue/job/Database.php @@ -28,13 +28,13 @@ class Database extends Job */ protected $job; - public function __construct(App $app, DatabaseQueue $database, $job, $connector, $queue) + public function __construct(App $app, DatabaseQueue $database, $job, $connection, $queue) { - $this->app = $app; - $this->job = $job; - $this->queue = $queue; - $this->database = $database; - $this->connector = $connector; + $this->app = $app; + $this->job = $job; + $this->queue = $queue; + $this->database = $database; + $this->connection = $connection; } /** diff --git a/src/queue/job/Redis.php b/src/queue/job/Redis.php index 4afabe5..79b6e40 100644 --- a/src/queue/job/Redis.php +++ b/src/queue/job/Redis.php @@ -44,14 +44,14 @@ class Redis extends Job */ protected $reserved; - public function __construct(App $app, RedisQueue $redis, $job, $reserved, $connector, $queue) + public function __construct(App $app, RedisQueue $redis, $job, $reserved, $connection, $queue) { - $this->app = $app; - $this->job = $job; - $this->queue = $queue; - $this->connector = $connector; - $this->redis = $redis; - $this->reserved = $reserved; + $this->app = $app; + $this->job = $job; + $this->queue = $queue; + $this->connection = $connection; + $this->redis = $redis; + $this->reserved = $reserved; $this->decoded = $this->payload(); } diff --git a/src/queue/job/Sync.php b/src/queue/job/Sync.php index 84d002b..c2299d8 100644 --- a/src/queue/job/Sync.php +++ b/src/queue/job/Sync.php @@ -23,12 +23,12 @@ class Sync extends Job */ protected $payload; - public function __construct(App $app, $payload, $connector, $queue) + public function __construct(App $app, $payload, $connection, $queue) { - $this->app = $app; - $this->connector = $connector; - $this->queue = $queue; - $this->payload = $payload; + $this->app = $app; + $this->connection = $connection; + $this->queue = $queue; + $this->payload = $payload; } /** diff --git a/tests/DatabaseConnectorTest.php b/tests/DatabaseConnectorTest.php new file mode 100644 index 0000000..3856886 --- /dev/null +++ b/tests/DatabaseConnectorTest.php @@ -0,0 +1,123 @@ +db = m::mock(Db::class); + $this->connector = new Database($this->db, 'table', 'default'); + } + + public function testPushProperlyPushesJobOntoDatabase() + { + $this->db->shouldReceive('name')->with('table')->andReturn($query = m::mock(stdClass::class)); + + $query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) { + $this->assertEquals('default', $array['queue']); + $this->assertEquals(json_encode(['job' => 'foo', 'maxTries' => null, 'timeout' => null, 'data' => ['data']]), $array['payload']); + $this->assertEquals(0, $array['attempts']); + $this->assertNull($array['reserved_at']); + $this->assertInternalType('int', $array['available_at']); + }); + $this->connector->push('foo', ['data']); + } + + public function testDelayedPushProperlyPushesJobOntoDatabase() + { + $this->db->shouldReceive('name')->with('table')->andReturn($query = m::mock(stdClass::class)); + + $query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) { + $this->assertEquals('default', $array['queue']); + $this->assertEquals(json_encode(['job' => 'foo', 'maxTries' => null, 'timeout' => null, 'data' => ['data']]), $array['payload']); + $this->assertEquals(0, $array['attempts']); + $this->assertNull($array['reserved_at']); + $this->assertInternalType('int', $array['available_at']); + }); + + $this->connector->later(10, 'foo', ['data']); + } + + public function testFailureToCreatePayloadFromObject() + { + $this->expectException('InvalidArgumentException'); + + $job = new stdClass; + $job->invalid = "\xc3\x28"; + + $queue = $this->getMockForAbstractClass(Connector::class); + $class = new ReflectionClass(Connector::class); + + $createPayload = $class->getMethod('createPayload'); + $createPayload->setAccessible(true); + $createPayload->invokeArgs($queue, [ + $job, + 'queue-name', + ]); + } + + public function testFailureToCreatePayloadFromArray() + { + $this->expectException('InvalidArgumentException'); + + $queue = $this->getMockForAbstractClass(Connector::class); + $class = new ReflectionClass(Connector::class); + + $createPayload = $class->getMethod('createPayload'); + $createPayload->setAccessible(true); + $createPayload->invokeArgs($queue, [ + ["\xc3\x28"], + 'queue-name', + ]); + } + + public function testBulkBatchPushesOntoDatabase() + { + + $this->db->shouldReceive('name')->with('table')->andReturn($query = m::mock(stdClass::class)); + + Carbon::setTestNow( + $now = Carbon::now()->addSeconds() + ); + + $query->shouldReceive('insertAll')->once()->andReturnUsing(function ($records) use ($now) { + $this->assertEquals([ + [ + 'queue' => 'queue', + 'payload' => json_encode(['job' => 'foo', 'maxTries' => null, 'timeout' => null, 'data' => ['data']]), + 'attempts' => 0, + 'reserved_at' => null, + 'available_at' => $now->getTimestamp(), + 'created_at' => $now->getTimestamp(), + ], [ + 'queue' => 'queue', + 'payload' => json_encode(['job' => 'bar', 'maxTries' => null, 'timeout' => null, 'data' => ['data']]), + 'attempts' => 0, + 'reserved_at' => null, + 'available_at' => $now->getTimestamp(), + 'created_at' => $now->getTimestamp(), + ], + ], $records); + }); + + $this->connector->bulk(['foo', 'bar'], ['data'], 'queue'); + } + +} diff --git a/tests/ListenerTest.php b/tests/ListenerTest.php new file mode 100644 index 0000000..883b033 --- /dev/null +++ b/tests/ListenerTest.php @@ -0,0 +1,80 @@ +makePartial(); + $process->shouldReceive('run')->once(); + /** @var Listener|MockInterface $listener */ + $listener = m::mock(Listener::class)->makePartial(); + $listener->shouldReceive('memoryExceeded')->once()->with(1)->andReturn(false); + + $listener->runProcess($process, 1); + } + + public function testListenerStopsWhenMemoryIsExceeded() + { + /** @var Process|MockInterface $process */ + $process = m::mock(Process::class)->makePartial(); + $process->shouldReceive('run')->once(); + /** @var Listener|MockInterface $listener */ + $listener = m::mock(Listener::class)->makePartial(); + $listener->shouldReceive('memoryExceeded')->once()->with(1)->andReturn(true); + $listener->shouldReceive('stop')->once(); + + $listener->runProcess($process, 1); + } + + public function testMakeProcessCorrectlyFormatsCommandLine() + { + $listener = new Listener(__DIR__); + + $process = $listener->makeProcess('connection', 'queue', 1, 3, 0, 2, 3); + $escape = '\\' === DIRECTORY_SEPARATOR ? '"' : '\''; + + $this->assertInstanceOf(Process::class, $process); + $this->assertEquals(__DIR__, $process->getWorkingDirectory()); + $this->assertEquals(3, $process->getTimeout()); + $this->assertEquals($escape . PHP_BINARY . $escape . " {$escape}think{$escape} {$escape}queue:work{$escape} {$escape}connection{$escape} {$escape}--once{$escape} {$escape}--queue=queue{$escape} {$escape}--delay=1{$escape} {$escape}--memory=2{$escape} {$escape}--sleep=3{$escape} {$escape}--tries=0{$escape}", $process->getCommandLine()); + } + + public function testMakeProcessCorrectlyFormatsCommandLineWithAnEnvironmentSpecified() + { + $listener = new Listener(__DIR__); + + $process = $listener->makeProcess('connection', 'queue', 1, 3, 0, 2, 3); + $escape = '\\' === DIRECTORY_SEPARATOR ? '"' : '\''; + + $this->assertInstanceOf(Process::class, $process); + $this->assertEquals(__DIR__, $process->getWorkingDirectory()); + $this->assertEquals(3, $process->getTimeout()); + $this->assertEquals($escape . PHP_BINARY . $escape . " {$escape}think{$escape} {$escape}queue:work{$escape} {$escape}connection{$escape} {$escape}--once{$escape} {$escape}--queue=queue{$escape} {$escape}--delay=1{$escape} {$escape}--memory=2{$escape} {$escape}--sleep=3{$escape} {$escape}--tries=0{$escape}", $process->getCommandLine()); + } + + public function testMakeProcessCorrectlyFormatsCommandLineWhenTheConnectionIsNotSpecified() + { + $listener = new Listener(__DIR__); + + $process = $listener->makeProcess(null, 'queue', 1, 3, 0, 2, 3); + $escape = '\\' === DIRECTORY_SEPARATOR ? '"' : '\''; + + $this->assertInstanceOf(Process::class, $process); + $this->assertEquals(__DIR__, $process->getWorkingDirectory()); + $this->assertEquals(3, $process->getTimeout()); + $this->assertEquals($escape . PHP_BINARY . $escape . " {$escape}think{$escape} {$escape}queue:work{$escape} {$escape}--once{$escape} {$escape}--queue=queue{$escape} {$escape}--delay=1{$escape} {$escape}--memory=2{$escape} {$escape}--sleep=3{$escape} {$escape}--tries=0{$escape}", $process->getCommandLine()); + } +} diff --git a/tests/WorkerTest.php b/tests/WorkerTest.php index 13e4281..87eeacc 100644 --- a/tests/WorkerTest.php +++ b/tests/WorkerTest.php @@ -2,6 +2,7 @@ namespace think\test\queue; +use Carbon\Carbon; use Mockery as m; use Mockery\MockInterface; use RuntimeException; @@ -14,6 +15,7 @@ use think\queue\event\JobFailed; use think\queue\event\JobProcessed; use think\queue\event\JobProcessing; +use think\queue\exception\MaxAttemptsExceededException; class WorkerTest extends TestCase { @@ -163,6 +165,100 @@ public function testJobIsNotReleasedIfItHasExceededMaxAttempts() $this->event->shouldNotHaveReceived('trigger', [m::type(JobProcessed::class)]); } + public function testJobIsNotReleasedIfItHasExpired() + { + $e = new RuntimeException; + + $job = new WorkerFakeJob(function ($job) use ($e) { + // In normal use this would be incremented by being popped off the queue + $job->attempts++; + + throw $e; + }); + + $job->timeoutAt = Carbon::now()->addSeconds(1)->getTimestamp(); + + $job->attempts = 0; + + Carbon::setTestNow( + Carbon::now()->addSeconds(1) + ); + + $worker = $this->getWorker(['default' => [$job]]); + $worker->runNextJob('sync', 'default'); + + $this->assertNull($job->releaseAfter); + $this->assertTrue($job->deleted); + $this->assertEquals($e, $job->failedWith); + $this->handle->shouldHaveReceived('report')->with($e); + $this->event->shouldHaveReceived('trigger')->with(m::type(JobExceptionOccurred::class))->once(); + $this->event->shouldHaveReceived('trigger')->with(m::type(JobFailed::class))->once(); + $this->event->shouldNotHaveReceived('trigger', [m::type(JobProcessed::class)]); + } + + public function testJobIsFailedIfItHasAlreadyExceededMaxAttempts() + { + $job = new WorkerFakeJob(function ($job) { + $job->attempts++; + }); + + $job->attempts = 2; + + $worker = $this->getWorker(['default' => [$job]]); + $worker->runNextJob('sync', 'default', 0, 3, 1); + + $this->assertNull($job->releaseAfter); + $this->assertTrue($job->deleted); + $this->assertInstanceOf(MaxAttemptsExceededException::class, $job->failedWith); + $this->handle->shouldHaveReceived('report')->with(m::type(MaxAttemptsExceededException::class)); + $this->event->shouldHaveReceived('trigger')->with(m::type(JobExceptionOccurred::class))->once(); + $this->event->shouldHaveReceived('trigger')->with(m::type(JobFailed::class))->once(); + $this->event->shouldNotHaveReceived('trigger', [m::type(JobProcessed::class)]); + } + + public function testJobIsFailedIfItHasAlreadyExpired() + { + $job = new WorkerFakeJob(function ($job) { + $job->attempts++; + }); + + $job->timeoutAt = Carbon::now()->addSeconds(2)->getTimestamp(); + + $job->attempts = 1; + + Carbon::setTestNow( + Carbon::now()->addSeconds(3) + ); + + $worker = $this->getWorker(['default' => [$job]]); + $worker->runNextJob('sync', 'default'); + + $this->assertNull($job->releaseAfter); + $this->assertTrue($job->deleted); + $this->assertInstanceOf(MaxAttemptsExceededException::class, $job->failedWith); + $this->handle->shouldHaveReceived('report')->with(m::type(MaxAttemptsExceededException::class)); + $this->event->shouldHaveReceived('trigger')->with(m::type(JobExceptionOccurred::class))->once(); + $this->event->shouldHaveReceived('trigger')->with(m::type(JobFailed::class))->once(); + $this->event->shouldNotHaveReceived('trigger', [m::type(JobProcessed::class)]); + } + + public function testJobBasedMaxRetries() + { + $job = new WorkerFakeJob(function ($job) { + $job->attempts++; + }); + + $job->attempts = 2; + + $job->maxTries = 10; + + $worker = $this->getWorker(['default' => [$job]]); + $worker->runNextJob('sync', 'default', 0, 3, 1); + + $this->assertFalse($job->deleted); + $this->assertNull($job->failedWith); + } + protected function getWorker($jobs) { $sync = m::mock(Sync::class); @@ -311,6 +407,11 @@ public function timeout() { return time() + 60; } + + public function getName() + { + return 'WorkerFakeJob'; + } } class LoopBreakerException extends RuntimeException