Skip to content

Commit cb084ea

Browse files
authored
Merge pull request #766 from greblov/759-chenge-command-exit-status
[consumption] Add ability to change process exit status from within queue consumer extension
2 parents 5785c92 + cd1ac76 commit cb084ea

File tree

12 files changed

+355
-16
lines changed

12 files changed

+355
-16
lines changed

pkg/enqueue/Consumption/Context/End.php

+18-2
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,23 @@ final class End
2727
*/
2828
private $logger;
2929

30-
public function __construct(Context $context, int $startTime, int $endTime, LoggerInterface $logger)
31-
{
30+
/**
31+
* @var int
32+
*/
33+
private $exitStatus;
34+
35+
public function __construct(
36+
Context $context,
37+
int $startTime,
38+
int $endTime,
39+
LoggerInterface $logger,
40+
?int $exitStatus = null
41+
) {
3242
$this->context = $context;
3343
$this->logger = $logger;
3444
$this->startTime = $startTime;
3545
$this->endTime = $endTime;
46+
$this->exitStatus = $exitStatus;
3647
}
3748

3849
public function getContext(): Context
@@ -60,4 +71,9 @@ public function getEndTime(): int
6071
{
6172
return $this->startTime;
6273
}
74+
75+
public function getExitStatus(): ?int
76+
{
77+
return $this->exitStatus;
78+
}
6379
}

pkg/enqueue/Consumption/Context/PostConsume.php

+12-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ final class PostConsume
4343
*/
4444
private $executionInterrupted;
4545

46+
/**
47+
* @var int
48+
*/
49+
private $exitStatus;
50+
4651
public function __construct(Context $context, SubscriptionConsumer $subscriptionConsumer, int $receivedMessagesCount, int $cycle, int $startTime, LoggerInterface $logger)
4752
{
4853
$this->context = $context;
@@ -85,13 +90,19 @@ public function getLogger(): LoggerInterface
8590
return $this->logger;
8691
}
8792

93+
public function getExitStatus(): ?int
94+
{
95+
return $this->exitStatus;
96+
}
97+
8898
public function isExecutionInterrupted(): bool
8999
{
90100
return $this->executionInterrupted;
91101
}
92102

93-
public function interruptExecution(): void
103+
public function interruptExecution(?int $exitStatus = null): void
94104
{
105+
$this->exitStatus = $exitStatus;
95106
$this->executionInterrupted = true;
96107
}
97108
}

pkg/enqueue/Consumption/Context/PostMessageReceived.php

+13-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ final class PostMessageReceived
4545
*/
4646
private $executionInterrupted;
4747

48+
/**
49+
* @var int
50+
*/
51+
private $exitStatus;
52+
4853
public function __construct(
4954
Context $context,
5055
Consumer $consumer,
@@ -89,20 +94,26 @@ public function getReceivedAt(): int
8994
}
9095

9196
/**
92-
* @return Result|null|object|string
97+
* @return Result|object|string|null
9398
*/
9499
public function getResult()
95100
{
96101
return $this->result;
97102
}
98103

104+
public function getExitStatus(): ?int
105+
{
106+
return $this->exitStatus;
107+
}
108+
99109
public function isExecutionInterrupted(): bool
100110
{
101111
return $this->executionInterrupted;
102112
}
103113

104-
public function interruptExecution(): void
114+
public function interruptExecution(?int $exitStatus = null): void
105115
{
116+
$this->exitStatus = $exitStatus;
106117
$this->executionInterrupted = true;
107118
}
108119
}

pkg/enqueue/Consumption/Context/PreConsume.php

+12-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ final class PreConsume
4343
*/
4444
private $executionInterrupted;
4545

46+
/**
47+
* @var int
48+
*/
49+
private $exitStatus;
50+
4651
public function __construct(Context $context, SubscriptionConsumer $subscriptionConsumer, LoggerInterface $logger, int $cycle, int $receiveTimeout, int $startTime)
4752
{
4853
$this->context = $context;
@@ -85,13 +90,19 @@ public function getStartTime(): int
8590
return $this->startTime;
8691
}
8792

93+
public function getExitStatus(): ?int
94+
{
95+
return $this->exitStatus;
96+
}
97+
8898
public function isExecutionInterrupted(): bool
8999
{
90100
return $this->executionInterrupted;
91101
}
92102

93-
public function interruptExecution(): void
103+
public function interruptExecution(?int $exitStatus = null): void
94104
{
105+
$this->exitStatus = $exitStatus;
95106
$this->executionInterrupted = true;
96107
}
97108
}

pkg/enqueue/Consumption/Context/Start.php

+12-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ final class Start
3838
*/
3939
private $executionInterrupted;
4040

41+
/**
42+
* @var int
43+
*/
44+
private $exitStatus;
45+
4146
/**
4247
* @param BoundProcessor[] $processors
4348
*/
@@ -105,13 +110,19 @@ public function changeBoundProcessors(array $processors): void
105110
});
106111
}
107112

113+
public function getExitStatus(): ?int
114+
{
115+
return $this->exitStatus;
116+
}
117+
108118
public function isExecutionInterrupted(): bool
109119
{
110120
return $this->executionInterrupted;
111121
}
112122

113-
public function interruptExecution(): void
123+
public function interruptExecution(?int $exitStatus = null): void
114124
{
125+
$this->exitStatus = $exitStatus;
115126
$this->executionInterrupted = true;
116127
}
117128
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption\Extension;
4+
5+
use Enqueue\Consumption\Context\End;
6+
use Enqueue\Consumption\EndExtensionInterface;
7+
8+
class ExitStatusExtension implements EndExtensionInterface
9+
{
10+
/**
11+
* @var int
12+
*/
13+
private $exitStatus;
14+
15+
public function onEnd(End $context): void
16+
{
17+
$this->exitStatus = $context->getExitStatus();
18+
}
19+
20+
public function getExitStatus(): ?int
21+
{
22+
return $this->exitStatus;
23+
}
24+
}

pkg/enqueue/Consumption/QueueConsumer.php

+6-5
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
147147
$extension->onStart($start);
148148

149149
if ($start->isExecutionInterrupted()) {
150-
$this->onEnd($extension, $startTime);
150+
$this->onEnd($extension, $startTime, $start->getExitStatus());
151151

152152
return;
153153
}
@@ -256,7 +256,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
256256
$extension->onPreConsume($preConsume);
257257

258258
if ($preConsume->isExecutionInterrupted()) {
259-
$this->onEnd($extension, $startTime, $subscriptionConsumer);
259+
$this->onEnd($extension, $startTime, $preConsume->getExitStatus(), $subscriptionConsumer);
260260

261261
return;
262262
}
@@ -267,7 +267,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
267267
$extension->onPostConsume($postConsume);
268268

269269
if ($interruptExecution || $postConsume->isExecutionInterrupted()) {
270-
$this->onEnd($extension, $startTime, $subscriptionConsumer);
270+
$this->onEnd($extension, $startTime, $postConsume->getExitStatus(), $subscriptionConsumer);
271271

272272
return;
273273
}
@@ -286,11 +286,12 @@ public function setFallbackSubscriptionConsumer(SubscriptionConsumer $fallbackSu
286286
$this->fallbackSubscriptionConsumer = $fallbackSubscriptionConsumer;
287287
}
288288

289-
private function onEnd(ExtensionInterface $extension, int $startTime, SubscriptionConsumer $subscriptionConsumer = null): void
289+
private function onEnd(ExtensionInterface $extension, int $startTime, ?int $exitStatus = null, SubscriptionConsumer $subscriptionConsumer = null): void
290290
{
291291
$endTime = (int) (microtime(true) * 1000);
292292

293-
$extension->onEnd(new End($this->interopContext, $startTime, $endTime, $this->logger));
293+
$endContext = new End($this->interopContext, $startTime, $endTime, $this->logger, $exitStatus);
294+
$extension->onEnd($endContext);
294295

295296
if ($subscriptionConsumer) {
296297
$subscriptionConsumer->unsubscribeAll();

pkg/enqueue/Symfony/Client/ConsumeCommand.php

+6-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\Client\DriverInterface;
66
use Enqueue\Consumption\ChainExtension;
7+
use Enqueue\Consumption\Extension\ExitStatusExtension;
78
use Enqueue\Consumption\Extension\LoggerExtension;
89
use Enqueue\Consumption\ExtensionInterface;
910
use Enqueue\Consumption\QueueConsumerInterface;
@@ -143,9 +144,12 @@ protected function execute(InputInterface $input, OutputInterface $output): ?int
143144
$consumer->bind($queue, $processor);
144145
}
145146

146-
$consumer->consume($this->getRuntimeExtensions($input, $output));
147+
$runtimeExtensionChain = $this->getRuntimeExtensions($input, $output);
148+
$exitStatusExtension = new ExitStatusExtension();
147149

148-
return null;
150+
$consumer->consume(new ChainExtension([$runtimeExtensionChain, $exitStatusExtension]));
151+
152+
return $exitStatusExtension->getExitStatus();
149153
}
150154

151155
protected function getRuntimeExtensions(InputInterface $input, OutputInterface $output): ExtensionInterface

pkg/enqueue/Symfony/Consumption/ConsumeCommand.php

+5-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Symfony\Consumption;
44

55
use Enqueue\Consumption\ChainExtension;
6+
use Enqueue\Consumption\Extension\ExitStatusExtension;
67
use Enqueue\Consumption\QueueConsumerInterface;
78
use Psr\Container\ContainerInterface;
89
use Psr\Container\NotFoundExceptionInterface;
@@ -75,9 +76,12 @@ protected function execute(InputInterface $input, OutputInterface $output): ?int
7576
array_unshift($extensions, $loggerExtension);
7677
}
7778

79+
$exitStatusExtension = new ExitStatusExtension();
80+
array_unshift($extensions, $exitStatusExtension);
81+
7882
$consumer->consume(new ChainExtension($extensions));
7983

80-
return null;
84+
return $exitStatusExtension->getExitStatus();
8185
}
8286

8387
private function getQueueConsumer(string $name): QueueConsumerInterface

pkg/enqueue/Tests/Consumption/QueueConsumerTest.php

+25-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Enqueue\Consumption\Context\ProcessorException;
1717
use Enqueue\Consumption\Context\Start;
1818
use Enqueue\Consumption\Exception\InvalidArgumentException;
19+
use Enqueue\Consumption\Extension\ExitStatusExtension;
1920
use Enqueue\Consumption\ExtensionInterface;
2021
use Enqueue\Consumption\QueueConsumer;
2122
use Enqueue\Consumption\Result;
@@ -1429,6 +1430,29 @@ public function testShouldCallProcessorAsMessageComeAlong()
14291430
$this->assertSame($fooConsumerStub, $actualContexts[2]->getConsumer());
14301431
}
14311432

1433+
public function testCaptureExitStatus()
1434+
{
1435+
$testExitCode = 5;
1436+
1437+
$stubExtension = $this->createExtension();
1438+
1439+
$stubExtension
1440+
->expects($this->once())
1441+
->method('onStart')
1442+
->with($this->isInstanceOf(Start::class))
1443+
->willReturnCallback(function (Start $context) use ($testExitCode) {
1444+
$context->interruptExecution($testExitCode);
1445+
})
1446+
;
1447+
1448+
$exitExtension = new ExitStatusExtension();
1449+
1450+
$consumer = new QueueConsumer($this->createContextStub(), $stubExtension);
1451+
$consumer->consume(new ChainExtension([$exitExtension]));
1452+
1453+
$this->assertEquals($testExitCode, $exitExtension->getExitStatus());
1454+
}
1455+
14321456
/**
14331457
* @return \PHPUnit_Framework_MockObject_MockObject
14341458
*/
@@ -1508,7 +1532,7 @@ private function createExtension()
15081532
}
15091533

15101534
/**
1511-
* @param null|mixed $queue
1535+
* @param mixed|null $queue
15121536
*
15131537
* @return \PHPUnit_Framework_MockObject_MockObject|Consumer
15141538
*/

0 commit comments

Comments
 (0)