Skip to content

Commit

Permalink
feat: support delay stamp
Browse files Browse the repository at this point in the history
  • Loading branch information
nikophil committed Oct 5, 2023
1 parent 68b535a commit 6b2b10e
Show file tree
Hide file tree
Showing 14 changed files with 318 additions and 29 deletions.
88 changes: 88 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,94 @@ class MyTest extends KernelTestCase // or WebTestCase
}
```
### Support of `DelayStamp`
Support of `DelayStamp` could be enabled per transport, within its dsn:
```yaml
# config/packages/messenger.yaml
when@test:
framework:
messenger:
transports:
async: test://?support_delay_stamp=true
```
> [!NOTE]
> Support of delay stamp was added in version 1.8.0.
#### Usage of a clock
> [!WARNING]
> Support of delay stamp needs an implementation of [PSR-20 Clock](https://www.php-fig.org/psr/psr-20/).
You can, for example use symfony's clock component:
```bash
composer require symfony/clock
```
When using Symfony's clock component, the service will be automatically configured.
Otherwise, you need to configure it manually:
```yaml
# config/services.yaml
services:
app.clock:
class: Some\Clock\Implementation
Psr\Clock\ClockInterface: '@app.clock'
```
#### Example of code supporting `DelayStamp`
> [!NOTE]
> This example uses `symfony/clock` component, but you can use any other implementation of `Psr\Clock\ClockInterface`.
```php
// Let's say somewhere in your app, you register some action that should occur in the future:
$bus->dispatch(new Enevelope(new TakeSomeAction1(), [DelayStamp::delayFor(new \DateInterval('P1D'))])); // will be handled in 1 day
$bus->dispatch(new Enevelope(new TakeSomeAction2(), [DelayStamp::delayFor(new \DateInterval('P3D'))])); // will be handled in 3 days
// In your test, you can check that the action is not yet performed:
class TestDelayedActions extends KernelTestCase
{
use InteractsWithMessenger;
use ClockSensitiveTrait;
public function testDelayedActions(): void
{
// 1. mock the clock, in order to perform sleeps
$clock = self::mockTime();
// 2. trigger the action that will dispatch the two messages
// ...
// 3. assert nothing happens yet
$this->transport('async')->process();
$this->transport('async')->queue()->assertCount(2);
$this->transport('async')->acknowledged()->assertCount(0);
// 4. sleep, process queue, and assert some messages have been handled
$clock->sleep('1 day');
$this->transport('async')->process();
$this->transport('async')->acknowledged()->assertContains(TakeSomeAction1::class);
$this->asssertTakeSomeAction1IsHandled();
// TakeSomeAction2 is still in the queue
$this->transport('async')->queue()->assertCount(1);
$clock->sleep('2 days');
$this->transport('async')->process();
$this->transport('async')->acknowledged()->assertContains(TakeSomeAction2::class);
$this->asssertTakeSomeAction2IsHandled();
}
}
```
## Bus
In addition to transport testing you also can make assertions on the bus. You can test message
Expand Down
4 changes: 4 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
"phpstan/phpstan": "^1.4",
"phpunit/phpunit": "^9.5.0",
"symfony/browser-kit": "^5.4|^6.0",
"symfony/clock": "^6.2",
"symfony/phpunit-bridge": "^5.4|^6.0",
"symfony/yaml": "^5.4|^6.0"
},
"suggest": {
"symfony/clock": "A PSR-20 clock implementation in order to support DelayStamp."
},
"config": {
"preferred-install": "dist",
"sort-packages": true
Expand Down
27 changes: 27 additions & 0 deletions src/Stamp/AvailableAtStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Zenstruck\Messenger\Test\Stamp;

use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;

final class AvailableAtStamp implements StampInterface
{
public function __construct(private \DateTimeImmutable $availableAt)
{
}

public static function fromDelayStamp(DelayStamp $delayStamp, \DateTimeImmutable $now): self
{
return new self(
$now->modify(sprintf('+%d seconds', $delayStamp->getDelay() / 1000))
);
}

public function getAvailableAt(): \DateTimeImmutable
{
return $this->availableAt;
}
}
50 changes: 47 additions & 3 deletions src/Transport/TestTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@

namespace Zenstruck\Messenger\Test\Transport;

use Psr\Clock\ClockInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Messenger\Worker;
use Zenstruck\Assert;
use Zenstruck\Messenger\Test\Stamp\AvailableAtStamp;

/**
* @author Kevin Bond <kevinbond@gmail.com>
Expand All @@ -33,12 +36,14 @@ final class TestTransport implements TransportInterface
'catch_exceptions' => true,
'test_serialization' => true,
'disable_retries' => true,
'support_delay_stamp' => false,
];

private string $name;
private EventDispatcherInterface $dispatcher;
private MessageBusInterface $bus;
private SerializerInterface $serializer;
private ClockInterface|null $clock;

/** @var array<string, bool> */
private static array $intercept = [];
Expand All @@ -52,6 +57,9 @@ final class TestTransport implements TransportInterface
/** @var array<string, bool> */
private static array $disableRetries = [];

/** @var array<string, bool> */
private static array $supportDelayStamp = [];

/** @var array<string, Envelope[]> */
private static array $dispatched = [];

Expand All @@ -72,19 +80,27 @@ final class TestTransport implements TransportInterface
*
* @param array<string,bool> $options
*/
public function __construct(string $name, MessageBusInterface $bus, EventDispatcherInterface $dispatcher, SerializerInterface $serializer, array $options = [])
public function __construct(string $name, MessageBusInterface $bus, EventDispatcherInterface $dispatcher, SerializerInterface $serializer, ClockInterface|null $clock = null, array $options = [])
{
$options = \array_merge(self::DEFAULT_OPTIONS, $options);

$this->name = $name;
$this->dispatcher = $dispatcher;
$this->bus = $bus;
$this->serializer = $serializer;
$this->clock = $clock;

self::$intercept[$name] ??= $options['intercept'];
self::$catchExceptions[$name] ??= $options['catch_exceptions'];
self::$testSerialization[$name] ??= $options['test_serialization'];
self::$disableRetries[$name] ??= $options['disable_retries'];
self::$supportDelayStamp[$name] ??= $options['support_delay_stamp'];

if (!self::$supportDelayStamp[$name]) {
trigger_deprecation('zenstruck/messenger-test', '1.8.0', 'Not supporting DelayStamp is deprecated, support will be removed in 2.0.');
} elseif(!$this->clock) {
throw new \InvalidArgumentException(sprintf('A service aliased "%s" must be available in order to support DelayStamp. You can install for instance symfony/clock (composer require symfony/clock).', ClockInterface::class));
}
}

/**
Expand Down Expand Up @@ -228,7 +244,23 @@ public function get(): iterable
return [];
}

return [\array_shift(self::$queue[$this->name])];
if (!$this->supportsDelayStamp()) {
return [\array_shift(self::$queue[$this->name])];
}

$now = $this->clock->now();

foreach (self::$queue[$this->name] as $i => $envelope) {
if (($availableAtStamp = $envelope->last(AvailableAtStamp::class)) && $now < $availableAtStamp->getAvailableAt()) {
continue;
}

unset(self::$queue[$this->name][$i]);

return [$envelope];
}

return [];
}

/**
Expand Down Expand Up @@ -263,6 +295,10 @@ public function send($what): Envelope

$envelope = Envelope::wrap($what);

if ($this->supportsDelayStamp() && $delayStamp = $envelope->last(DelayStamp::class)) {
$envelope = $envelope->with(AvailableAtStamp::fromDelayStamp($delayStamp, $this->clock->now()));
}

if ($this->isRetriesDisabled() && $envelope->last(RedeliveryStamp::class)) {
// message is being retried, don't process
return $envelope;
Expand Down Expand Up @@ -303,7 +339,7 @@ public static function resetAll(): void

public static function initialize(): void
{
self::$intercept = self::$catchExceptions = self::$testSerialization = self::$disableRetries = [];
self::$intercept = self::$catchExceptions = self::$testSerialization = self::$disableRetries = self::$supportDelayStamp = [];
}

public static function enableMessagesCollection(): void
Expand Down Expand Up @@ -349,6 +385,14 @@ private function isRetriesDisabled(): bool
return self::$disableRetries[$this->name];
}

/**
* @phpstan-assert-if-true !null $this->clock
*/
private function supportsDelayStamp(): bool
{
return $this->clock && self::$supportDelayStamp[$this->name];
}

private function hasMessagesToProcess(): bool
{
return !empty(self::$queue[$this->name] ?? []);
Expand Down
11 changes: 4 additions & 7 deletions src/Transport/TestTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Zenstruck\Messenger\Test\Transport;

use Psr\Clock\ClockInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -24,18 +25,13 @@
*/
final class TestTransportFactory implements TransportFactoryInterface
{
private MessageBusInterface $bus;
private EventDispatcherInterface $dispatcher;

public function __construct(MessageBusInterface $bus, EventDispatcherInterface $dispatcher)
public function __construct(private MessageBusInterface $bus, private EventDispatcherInterface $dispatcher, private ClockInterface|null $clock = null)
{
$this->bus = $bus;
$this->dispatcher = $dispatcher;
}

public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface // @phpstan-ignore-line
{
return new TestTransport($options['transport_name'], $this->bus, $this->dispatcher, $serializer, $this->parseDsn($dsn));
return new TestTransport($options['transport_name'], $this->bus, $this->dispatcher, $serializer, $this->clock, $this->parseDsn($dsn));
}

public function supports(string $dsn, array $options): bool // @phpstan-ignore-line
Expand All @@ -59,6 +55,7 @@ private function parseDsn(string $dsn): array
'catch_exceptions' => \filter_var($query['catch_exceptions'] ?? true, \FILTER_VALIDATE_BOOLEAN),
'test_serialization' => \filter_var($query['test_serialization'] ?? true, \FILTER_VALIDATE_BOOLEAN),
'disable_retries' => \filter_var($query['disable_retries'] ?? true, \FILTER_VALIDATE_BOOLEAN),
'support_delay_stamp' => \filter_var($query['support_delay_stamp'] ?? true, \FILTER_VALIDATE_BOOLEAN),
];
}
}
8 changes: 7 additions & 1 deletion src/ZenstruckMessengerTestBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@

namespace Zenstruck\Messenger\Test;

use Psr\Clock\ClockInterface;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\Extension\ExtensionInterface;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\HttpKernel\Bundle\Bundle;
Expand All @@ -30,7 +32,11 @@ final class ZenstruckMessengerTestBundle extends Bundle implements CompilerPassI
public function build(ContainerBuilder $container): void
{
$container->register('zenstruck_messenger_test.transport_factory', TestTransportFactory::class)
->setArguments([new Reference('messenger.routable_message_bus'), new Reference('event_dispatcher')])
->setArguments([
new Reference('messenger.routable_message_bus'),
new Reference('event_dispatcher'),
new Reference(ClockInterface::class, invalidBehavior: ContainerInterface::NULL_ON_INVALID_REFERENCE),
])
->addTag('messenger.transport_factory')
;

Expand Down
Loading

0 comments on commit 6b2b10e

Please sign in to comment.