Skip to content

Commit

Permalink
replace sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhineng committed Dec 24, 2024
1 parent 38e5369 commit d48ca6c
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 247 deletions.
7 changes: 7 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
parameters:
ignoreErrors:
-
message: '#^Parameter \#1 \$arguments of method Dew\\Acs\\MnsOpen\\QueueClient\:\:sendMessage\(\) expects array\{QueueName\: string, Message\: array\{MessageBody\: string, DelaySeconds\?\: int\}\}, array\{QueueName\: string, Message\: non\-empty\-array\<mixed\>\} given\.$#'
identifier: argument.type
count: 1
path: src/MnsQueue.php
1 change: 1 addition & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
includes:
- vendor/phpstan/phpstan-strict-rules/rules.neon
- phpstan-baseline.neon

parameters:
level: max
Expand Down
62 changes: 41 additions & 21 deletions src/MnsConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@

namespace Dew\MnsDriver;

use Dew\Mns\MnsClient;
use Dew\Mns\Versions\V20150606\Queue;
use Dew\Acs\MnsOpen\MnsOpenClient;
use Dew\Acs\MnsOpen\QueueClient;
use Illuminate\Queue\Connectors\ConnectorInterface;

/**
* @phpstan-type Config array{
* endpoint: string,
* key: string,
* secret: string,
* queue: string,
* http?: array<string, mixed>
* @phpstan-type TConfig array{
* key: string,
* secret: string,
* region: string,
* endpoint: string,
* queue: string,
* console_endpoint?: string
* }
*/
class MnsConnector implements ConnectorInterface
Expand All @@ -25,25 +26,44 @@ class MnsConnector implements ConnectorInterface
*/
public function connect(array $config)
{
/** @var Config $config */
$mns = new MnsClient($config['endpoint'], $config['key'], $config['secret']);

$mns->configure($this->withDefaultConfiguration($config['http'] ?? []));
/** @var TConfig $config */
return new MnsQueue(
$this->makeConsoleClient($config),
$this->makeQueueClient($config),
$config['queue']
);
}

return new MnsQueue(new Queue($mns), $config['queue']);
/**
* Make a MNS console client.
*
* @param TConfig $config
*/
private function makeConsoleClient(array $config): MnsOpenClient
{
return new MnsOpenClient(array_filter([
'credentials' => [
'key' => $config['key'],
'secret' => $config['secret'],
],
'region' => $config['region'],
'endpoint' => $config['console_endpoint'] ?? null,
], fn (mixed $value): bool => $value !== null));
}

/**
* Build a configuration with default one.
* Make a MNS queue client.
*
* @param array<string, mixed> $config
* @return array<string, mixed>
* @param TConfig $config
*/
protected function withDefaultConfiguration(array $config = []): array
private function makeQueueClient(array $config): QueueClient
{
// timeout: receiving messages could take up to 30 seconds.
return array_merge([
'timeout' => 60.0,
], $config);
return new QueueClient([
'credentials' => [
'key' => $config['key'],
'secret' => $config['secret'],
],
'endpoint' => $config['endpoint'],
]);
}
}
43 changes: 18 additions & 25 deletions src/MnsJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

namespace Dew\MnsDriver;

use Dew\Mns\Versions\V20150606\Queue;
use Dew\Mns\Versions\V20150606\Results\ReceiveMessageResult;
use Dew\Acs\MnsOpen\Models\ReceiveMessage;
use Dew\Acs\MnsOpen\QueueClient;
use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Job as JobContract;
use Illuminate\Queue\Jobs\Job;
Expand All @@ -13,12 +13,12 @@ class MnsJob extends Job implements JobContract
/**
* Create a new job instance.
*
* @param \Dew\Mns\Versions\V20150606\Queue $mns
* @param \Dew\Acs\MnsOpen\QueueClient $mns
*/
public function __construct(
Container $container,
protected $mns,
protected ReceiveMessageResult $job,
protected ReceiveMessage $job,
string $connectionName,
string $queue
) {
Expand All @@ -37,15 +37,11 @@ public function release($delay = 0)
{
parent::release($delay);

$result = $this->mns->changeMessageVisibility(
$this->queue, (string) $this->job->receiptHandle(), $delay
);

if ($result->failed()) {
throw new MnsQueueException(sprintf('Release the job with error [%s] %s',
$result->errorCode(), $result->errorMessage()
));
}
$this->mns->changeMessageVisibility([
'QueueName' => $this->queue,
'ReceiptHandle' => $this->job->receiptHandle,
'VisibilityTimeout' => $delay,
]);
}

/**
Expand All @@ -57,13 +53,10 @@ public function delete()
{
parent::delete();

$result = $this->mns->deleteMessage($this->queue, (string) $this->job->receiptHandle());

if ($result->failed()) {
throw new MnsQueueException(sprintf('Delete the job with error [%s] %s',
$result->errorCode(), $result->errorMessage()
));
}
$this->mns->deleteMessage([
'QueueName' => $this->queue,
'ReceiptHandle' => $this->job->receiptHandle,
]);
}

/**
Expand All @@ -73,7 +66,7 @@ public function delete()
*/
public function attempts()
{
return (int) $this->job->dequeueCount();
return $this->job->dequeueCount;
}

/**
Expand All @@ -83,7 +76,7 @@ public function attempts()
*/
public function getJobId()
{
return (string) $this->job->messageId();
return $this->job->messageId;
}

/**
Expand All @@ -93,21 +86,21 @@ public function getJobId()
*/
public function getRawBody()
{
return (string) $this->job->messageBody();
return $this->job->messageBody;
}

/**
* Get the underlying MNS queue instance.
*/
public function getMns(): Queue
public function getMns(): QueueClient
{
return $this->mns;
}

/**
* Get the underlying raw MNS job.
*/
public function getMnsJob(): ReceiveMessageResult
public function getMnsJob(): ReceiveMessage
{
return $this->job;
}
Expand Down
Loading

0 comments on commit d48ca6c

Please sign in to comment.