Skip to content

Commit

Permalink
完善
Browse files Browse the repository at this point in the history
  • Loading branch information
yunwuxin committed Jun 6, 2019
1 parent a8b0201 commit c34b983
Show file tree
Hide file tree
Showing 26 changed files with 463 additions and 114 deletions.
19 changes: 16 additions & 3 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@
namespace think;

use InvalidArgumentException;
use think\cache\driver\Redis;
use think\helper\Str;
use think\queue\Connector;
use think\queue\connector\Database;

/**
* Class Queue
* @package think\queue
*
* @method Connector driver($driver = null)
* @mixin Database
* @mixin Redis
*/
class Queue extends Factory
{
Expand All @@ -33,7 +37,7 @@ class Queue extends Factory
*/
protected function getConfig($name)
{
return $this->app->config->get("queue.connectors.{$name}", ['driver' => 'sync']);
return $this->app->config->get("queue.connections.{$name}", ['driver' => 'sync']);
}

protected function createDriver($name)
Expand All @@ -47,18 +51,27 @@ protected function createDriver($name)
$driver = $this->app->invokeClass($class, [$this->getConfig($driver)]);

return $driver->setApp($this->app)
->setConnectorName($name);
->setConnection($name);
}

throw new InvalidArgumentException("Driver [$driver] not supported.");
}

/**
* @param null|string $name
* @return Connector
*/
public function connection($name = null)
{
return $this->driver($name);
}

/**
* 默认驱动
* @return string
*/
public function getDefaultDriver()
{
return $this->app->config->get('queue.default', 'sync');
return $this->app->config->get('queue.default');
}
}
6 changes: 3 additions & 3 deletions src/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
// +----------------------------------------------------------------------

return [
'default' => 'sync',
'connectors' => [
'default' => 'sync',
'connections' => [
'sync' => [
'driver' => 'sync',
],
Expand All @@ -31,7 +31,7 @@
'persistent' => false,
],
],
'failed' => [
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
Expand Down
10 changes: 5 additions & 5 deletions src/queue/Connector.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ abstract class Connector
*
* @var string
*/
protected $connectorName;
protected $connection;

protected $options = [];

Expand Down Expand Up @@ -143,9 +143,9 @@ public function setApp(App $app)
*
* @return string
*/
public function getConnectorName()
public function getConnection()
{
return $this->connectorName;
return $this->connection;
}

/**
Expand All @@ -154,9 +154,9 @@ public function getConnectorName()
* @param string $name
* @return $this
*/
public function setConnectorName($name)
public function setConnection($name)
{
$this->connectorName = $name;
$this->connection = $name;

return $this;
}
Expand Down
6 changes: 3 additions & 3 deletions src/queue/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ abstract class Job
/**
* The name of the connection the job belongs to.
*/
protected $connector;
protected $connection;

/**
* Indicates if the job has been deleted.
Expand Down Expand Up @@ -266,9 +266,9 @@ public function getName()
*
* @return string
*/
public function getConnector()
public function getConnection()
{
return $this->connector;
return $this->connection;
}

/**
Expand Down
12 changes: 6 additions & 6 deletions src/queue/Listener.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected function phpBinary()
}

/**
* @param string $connector
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $sleep
Expand All @@ -67,17 +67,17 @@ protected function phpBinary()
* @param int $timeout
* @return void
*/
public function listen($connector, $queue, $delay = 0, $sleep = 3, $maxTries = 0, $memory = 128, $timeout = 60)
public function listen($connection, $queue, $delay = 0, $sleep = 3, $maxTries = 0, $memory = 128, $timeout = 60)
{
$process = $this->makeProcess($connector, $queue, $delay, $sleep, $maxTries, $memory, $timeout);
$process = $this->makeProcess($connection, $queue, $delay, $sleep, $maxTries, $memory, $timeout);

while (true) {
$this->runProcess($process, $memory);
}
}

/**
* @param string $connector
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $sleep
Expand All @@ -86,13 +86,13 @@ public function listen($connector, $queue, $delay = 0, $sleep = 3, $maxTries = 0
* @param int $timeout
* @return Process
*/
public function makeProcess($connector, $queue, $delay, $sleep, $maxTries, $memory, $timeout)
public function makeProcess($connection, $queue, $delay, $sleep, $maxTries, $memory, $timeout)
{
$command = array_filter([
$this->phpBinary(),
'think',
'queue:work',
$connector,
$connection,
'--once',
"--queue={$queue}",
"--delay={$delay}",
Expand Down
67 changes: 34 additions & 33 deletions src/queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace think\queue;

use Carbon\Carbon;
use Exception;
use RuntimeException;
use think\Cache;
Expand All @@ -22,6 +23,7 @@
use think\queue\event\JobProcessed;
use think\queue\event\JobProcessing;
use think\queue\event\WorkerStopping;
use think\queue\exception\MaxAttemptsExceededException;
use Throwable;

class Worker
Expand Down Expand Up @@ -59,15 +61,15 @@ public function __construct(Queue $queue, Event $event, Handle $handle, Cache $c
}

/**
* @param string $connector
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @param int $memory
* @param int $timeout
*/
public function daemon($connector, $queue, $delay = 0, $sleep = 3, $maxTries = 0, $memory = 128, $timeout = 60)
public function daemon($connection, $queue, $delay = 0, $sleep = 3, $maxTries = 0, $memory = 128, $timeout = 60)
{
if ($this->supportsAsyncSignals()) {
$this->listenForSignals();
Expand All @@ -78,15 +80,15 @@ public function daemon($connector, $queue, $delay = 0, $sleep = 3, $maxTries = 0
while (true) {

$job = $this->getNextJob(
$this->queue->driver($connector), $queue
$this->queue->connection($connection), $queue
);

if ($this->supportsAsyncSignals()) {
$this->registerTimeoutHandler($job, $timeout);
}

if ($job) {
$this->runJob($job, $connector, $maxTries, $delay);
$this->runJob($job, $connection, $maxTries, $delay);
} else {
$this->sleep($sleep);
}
Expand Down Expand Up @@ -231,39 +233,38 @@ protected function listenForSignals()

/**
* 执行下个任务
* @param string $connectorName
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @return void
* @throws Exception
*/
public function runNextJob($connectorName, $queue, $delay = 0, $sleep = 3, $maxTries = 0)
public function runNextJob($connection, $queue, $delay = 0, $sleep = 3, $maxTries = 0)
{

$job = $this->getNextJob($this->queue->driver($connectorName), $queue);
$job = $this->getNextJob($this->queue->connection($connection), $queue);

if ($job) {
$this->runJob($job, $connectorName, $maxTries, $delay);
return;
$this->runJob($job, $connection, $maxTries, $delay);
} else {
$this->sleep($sleep);
}

$this->sleep($sleep);
}

/**
* 执行任务
* @param Job $job
* @param string $connectorName
* @param string $connection
* @param int $maxTries
* @param int $delay
* @return void
*/
protected function runJob($job, $connectorName, $maxTries, $delay)
protected function runJob($job, $connection, $maxTries, $delay)
{
try {
$this->process($connectorName, $job, $maxTries, $delay);
$this->process($connection, $job, $maxTries, $delay);
} catch (Exception | Throwable $e) {
$this->handle->report($e);
}
Expand Down Expand Up @@ -291,32 +292,32 @@ protected function getNextJob($connector, $queue)

/**
* Process a given job from the queue.
* @param string $connector
* @param string $connection
* @param Job $job
* @param int $maxTries
* @param int $delay
* @return void
* @throws Exception
*/
public function process($connector, $job, $maxTries = 0, $delay = 0)
public function process($connection, $job, $maxTries = 0, $delay = 0)
{
try {
$this->event->trigger(new JobProcessing($connector, $job));
$this->event->trigger(new JobProcessing($connection, $job));

$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connector, $job, (int) $maxTries
$connection, $job, (int) $maxTries
);

$job->fire();

$this->event->trigger(new JobProcessed($connector, $job));
$this->event->trigger(new JobProcessed($connection, $job));
} catch (Exception | Throwable $e) {
try {
if (!$job->hasFailed()) {
$this->markJobAsFailedIfWillExceedMaxAttempts($connector, $job, (int) $maxTries, $e);
$this->markJobAsFailedIfWillExceedMaxAttempts($connection, $job, (int) $maxTries, $e);
}

$this->event->trigger(new JobExceptionOccurred($connector, $job, $e));
$this->event->trigger(new JobExceptionOccurred($connection, $job, $e));
} finally {
if (!$job->isDeleted() && !$job->isReleased() && !$job->hasFailed()) {
$job->release($delay);
Expand All @@ -328,56 +329,56 @@ public function process($connector, $job, $maxTries = 0, $delay = 0)
}

/**
* @param string $connector
* @param string $connection
* @param Job $job
* @param int $maxTries
*/
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connector, $job, $maxTries)
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connection, $job, $maxTries)
{
$maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

$timeoutAt = $job->timeoutAt();

if ($timeoutAt && time() <= $timeoutAt) {
if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) {
return;
}

if (!$timeoutAt && (0 === $maxTries || $job->attempts() <= $maxTries)) {
return;
}

$this->failJob($connector, $job, $e = new RuntimeException(
$this->failJob($connection, $job, $e = new MaxAttemptsExceededException(
$job->getName() . ' has been attempted too many times or run too long. The job may have previously timed out.'
));

throw $e;
}

/**
* @param string $connector
* @param string $connection
* @param Job $job
* @param int $maxTries
* @param Exception $e
*/
protected function markJobAsFailedIfWillExceedMaxAttempts($connector, $job, $maxTries, $e)
protected function markJobAsFailedIfWillExceedMaxAttempts($connection, $job, $maxTries, $e)
{
$maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

if ($job->timeoutAt() && $job->timeoutAt() <= time()) {
$this->failJob($connector, $job, $e);
if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
$this->failJob($connection, $job, $e);
}

if ($maxTries > 0 && $job->attempts() >= $maxTries) {
$this->failJob($connector, $job, $e);
$this->failJob($connection, $job, $e);
}
}

/**
* @param string $connector
* @param string $connection
* @param Job $job
* @param Exception $e
*/
protected function failJob($connector, $job, $e)
protected function failJob($connection, $job, $e)
{
$job->markAsFailed();

Expand All @@ -391,7 +392,7 @@ protected function failJob($connector, $job, $e)
$job->failed($e);
} finally {
$this->event->trigger(new JobFailed(
$connector, $job, $e ?: new RuntimeException('ManuallyFailed')
$connection, $job, $e ?: new RuntimeException('ManuallyFailed')
));
}
}
Expand Down
Loading

0 comments on commit c34b983

Please sign in to comment.