diff --git a/README.md b/README.md index e46a5f4..fce7620 100644 --- a/README.md +++ b/README.md @@ -357,6 +357,94 @@ class MyTest extends KernelTestCase // or WebTestCase } ``` +### Support of `DelayStamp` + +Support of `DelayStamp` could be enabled per transport, within its dsn: + +```yaml +# config/packages/messenger.yaml + +when@test: + framework: + messenger: + transports: + async: test://?support_delay_stamp=true +``` + +> [!NOTE] +> Support of delay stamp was added in version 1.8.0. + +#### Usage of a clock + +> [!WARNING] +> Support of delay stamp needs an implementation of [PSR-20 Clock](https://www.php-fig.org/psr/psr-20/). + +You can, for example use symfony's clock component: +```bash +composer require symfony/clock +``` + +When using Symfony's clock component, the service will be automatically configured. +Otherwise, you need to configure it manually: + +```yaml +# config/services.yaml +services: + app.clock: + class: Some\Clock\Implementation + Psr\Clock\ClockInterface: '@app.clock' +``` + +#### Example of code supporting `DelayStamp` + +> [!NOTE] +> This example uses `symfony/clock` component, but you can use any other implementation of `Psr\Clock\ClockInterface`. + +```php + +// Let's say somewhere in your app, you register some action that should occur in the future: + +$bus->dispatch(new Enevelope(new TakeSomeAction1(), [DelayStamp::delayFor(new \DateInterval('P1D'))])); // will be handled in 1 day +$bus->dispatch(new Enevelope(new TakeSomeAction2(), [DelayStamp::delayFor(new \DateInterval('P3D'))])); // will be handled in 3 days + +// In your test, you can check that the action is not yet performed: + +class TestDelayedActions extends KernelTestCase +{ + use InteractsWithMessenger; + use ClockSensitiveTrait; + + public function testDelayedActions(): void + { + // 1. mock the clock, in order to perform sleeps + $clock = self::mockTime(); + + // 2. trigger the action that will dispatch the two messages + + // ... + + // 3. assert nothing happens yet + $this->transport('async')->process(); + $this->transport('async')->queue()->assertCount(2); + $this->transport('async')->acknowledged()->assertCount(0); + + // 4. sleep, process queue, and assert some messages have been handled + $clock->sleep('1 day'); + $this->transport('async')->process(); + $this->transport('async')->acknowledged()->assertContains(TakeSomeAction1::class); + $this->asssertTakeSomeAction1IsHandled(); + + // TakeSomeAction2 is still in the queue + $this->transport('async')->queue()->assertCount(1); + + $clock->sleep('2 days'); + $this->transport('async')->process(); + $this->transport('async')->acknowledged()->assertContains(TakeSomeAction2::class); + $this->asssertTakeSomeAction2IsHandled(); + } +} +``` + ## Bus In addition to transport testing you also can make assertions on the bus. You can test message diff --git a/composer.json b/composer.json index 31d7fa2..afa26d6 100644 --- a/composer.json +++ b/composer.json @@ -22,9 +22,13 @@ "phpstan/phpstan": "^1.4", "phpunit/phpunit": "^9.5.0", "symfony/browser-kit": "^5.4|^6.0", + "symfony/clock": "^6.2", "symfony/phpunit-bridge": "^5.4|^6.0", "symfony/yaml": "^5.4|^6.0" }, + "suggest": { + "symfony/clock": "A PSR-20 clock implementation in order to support DelayStamp." + }, "config": { "preferred-install": "dist", "sort-packages": true diff --git a/src/Stamp/AvailableAtStamp.php b/src/Stamp/AvailableAtStamp.php new file mode 100644 index 0000000..83de8be --- /dev/null +++ b/src/Stamp/AvailableAtStamp.php @@ -0,0 +1,27 @@ +modify(sprintf('+%d seconds', $delayStamp->getDelay() / 1000)) + ); + } + + public function getAvailableAt(): \DateTimeImmutable + { + return $this->availableAt; + } +} diff --git a/src/Transport/TestTransport.php b/src/Transport/TestTransport.php index 45b4d15..ea133aa 100644 --- a/src/Transport/TestTransport.php +++ b/src/Transport/TestTransport.php @@ -11,17 +11,20 @@ namespace Zenstruck\Messenger\Test\Transport; +use Psr\Clock\ClockInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerRunningEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\Messenger\Worker; use Zenstruck\Assert; +use Zenstruck\Messenger\Test\Stamp\AvailableAtStamp; /** * @author Kevin Bond @@ -33,12 +36,14 @@ final class TestTransport implements TransportInterface 'catch_exceptions' => true, 'test_serialization' => true, 'disable_retries' => true, + 'support_delay_stamp' => false, ]; private string $name; private EventDispatcherInterface $dispatcher; private MessageBusInterface $bus; private SerializerInterface $serializer; + private ClockInterface|null $clock; /** @var array */ private static array $intercept = []; @@ -52,6 +57,9 @@ final class TestTransport implements TransportInterface /** @var array */ private static array $disableRetries = []; + /** @var array */ + private static array $supportDelayStamp = []; + /** @var array */ private static array $dispatched = []; @@ -72,7 +80,7 @@ final class TestTransport implements TransportInterface * * @param array $options */ - public function __construct(string $name, MessageBusInterface $bus, EventDispatcherInterface $dispatcher, SerializerInterface $serializer, array $options = []) + public function __construct(string $name, MessageBusInterface $bus, EventDispatcherInterface $dispatcher, SerializerInterface $serializer, ClockInterface|null $clock = null, array $options = []) { $options = \array_merge(self::DEFAULT_OPTIONS, $options); @@ -80,11 +88,19 @@ public function __construct(string $name, MessageBusInterface $bus, EventDispatc $this->dispatcher = $dispatcher; $this->bus = $bus; $this->serializer = $serializer; + $this->clock = $clock; self::$intercept[$name] ??= $options['intercept']; self::$catchExceptions[$name] ??= $options['catch_exceptions']; self::$testSerialization[$name] ??= $options['test_serialization']; self::$disableRetries[$name] ??= $options['disable_retries']; + self::$supportDelayStamp[$name] ??= $options['support_delay_stamp']; + + if (!self::$supportDelayStamp[$name]) { + trigger_deprecation('zenstruck/messenger-test', '1.8.0', 'Not supporting DelayStamp is deprecated, support will be removed in 2.0.'); + } elseif(!$this->clock) { + throw new \InvalidArgumentException(sprintf('A service aliased "%s" must be available in order to support DelayStamp. You can install for instance symfony/clock (composer require symfony/clock).', ClockInterface::class)); + } } /** @@ -228,7 +244,23 @@ public function get(): iterable return []; } - return [\array_shift(self::$queue[$this->name])]; + if (!$this->supportsDelayStamp()) { + return [\array_shift(self::$queue[$this->name])]; + } + + $now = $this->clock->now(); + + foreach (self::$queue[$this->name] as $i => $envelope) { + if (($availableAtStamp = $envelope->last(AvailableAtStamp::class)) && $now < $availableAtStamp->getAvailableAt()) { + continue; + } + + unset(self::$queue[$this->name][$i]); + + return [$envelope]; + } + + return []; } /** @@ -263,6 +295,10 @@ public function send($what): Envelope $envelope = Envelope::wrap($what); + if ($this->supportsDelayStamp() && $delayStamp = $envelope->last(DelayStamp::class)) { + $envelope = $envelope->with(AvailableAtStamp::fromDelayStamp($delayStamp, $this->clock->now())); + } + if ($this->isRetriesDisabled() && $envelope->last(RedeliveryStamp::class)) { // message is being retried, don't process return $envelope; @@ -303,7 +339,7 @@ public static function resetAll(): void public static function initialize(): void { - self::$intercept = self::$catchExceptions = self::$testSerialization = self::$disableRetries = []; + self::$intercept = self::$catchExceptions = self::$testSerialization = self::$disableRetries = self::$supportDelayStamp = []; } public static function enableMessagesCollection(): void @@ -349,6 +385,14 @@ private function isRetriesDisabled(): bool return self::$disableRetries[$this->name]; } + /** + * @phpstan-assert-if-true !null $this->clock + */ + private function supportsDelayStamp(): bool + { + return $this->clock && self::$supportDelayStamp[$this->name]; + } + private function hasMessagesToProcess(): bool { return !empty(self::$queue[$this->name] ?? []); diff --git a/src/Transport/TestTransportFactory.php b/src/Transport/TestTransportFactory.php index 6667712..ac3a51b 100644 --- a/src/Transport/TestTransportFactory.php +++ b/src/Transport/TestTransportFactory.php @@ -11,6 +11,7 @@ namespace Zenstruck\Messenger\Test\Transport; +use Psr\Clock\ClockInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -24,18 +25,13 @@ */ final class TestTransportFactory implements TransportFactoryInterface { - private MessageBusInterface $bus; - private EventDispatcherInterface $dispatcher; - - public function __construct(MessageBusInterface $bus, EventDispatcherInterface $dispatcher) + public function __construct(private MessageBusInterface $bus, private EventDispatcherInterface $dispatcher, private ClockInterface|null $clock = null) { - $this->bus = $bus; - $this->dispatcher = $dispatcher; } public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface // @phpstan-ignore-line { - return new TestTransport($options['transport_name'], $this->bus, $this->dispatcher, $serializer, $this->parseDsn($dsn)); + return new TestTransport($options['transport_name'], $this->bus, $this->dispatcher, $serializer, $this->clock, $this->parseDsn($dsn)); } public function supports(string $dsn, array $options): bool // @phpstan-ignore-line @@ -59,6 +55,7 @@ private function parseDsn(string $dsn): array 'catch_exceptions' => \filter_var($query['catch_exceptions'] ?? true, \FILTER_VALIDATE_BOOLEAN), 'test_serialization' => \filter_var($query['test_serialization'] ?? true, \FILTER_VALIDATE_BOOLEAN), 'disable_retries' => \filter_var($query['disable_retries'] ?? true, \FILTER_VALIDATE_BOOLEAN), + 'support_delay_stamp' => \filter_var($query['support_delay_stamp'] ?? true, \FILTER_VALIDATE_BOOLEAN), ]; } } diff --git a/src/ZenstruckMessengerTestBundle.php b/src/ZenstruckMessengerTestBundle.php index 0291124..20edcfe 100644 --- a/src/ZenstruckMessengerTestBundle.php +++ b/src/ZenstruckMessengerTestBundle.php @@ -11,8 +11,10 @@ namespace Zenstruck\Messenger\Test; +use Psr\Clock\ClockInterface; use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; use Symfony\Component\DependencyInjection\ContainerBuilder; +use Symfony\Component\DependencyInjection\ContainerInterface; use Symfony\Component\DependencyInjection\Extension\ExtensionInterface; use Symfony\Component\DependencyInjection\Reference; use Symfony\Component\HttpKernel\Bundle\Bundle; @@ -30,7 +32,11 @@ final class ZenstruckMessengerTestBundle extends Bundle implements CompilerPassI public function build(ContainerBuilder $container): void { $container->register('zenstruck_messenger_test.transport_factory', TestTransportFactory::class) - ->setArguments([new Reference('messenger.routable_message_bus'), new Reference('event_dispatcher')]) + ->setArguments([ + new Reference('messenger.routable_message_bus'), + new Reference('event_dispatcher'), + new Reference(ClockInterface::class, invalidBehavior: ContainerInterface::NULL_ON_INVALID_REFERENCE), + ]) ->addTag('messenger.transport_factory') ; diff --git a/tests/DelayStampTestTest.php b/tests/DelayStampTestTest.php new file mode 100644 index 0000000..f2cd2ab --- /dev/null +++ b/tests/DelayStampTestTest.php @@ -0,0 +1,95 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Zenstruck\Messenger\Test\Tests; + +use Symfony\Bundle\FrameworkBundle\Test\WebTestCase; +use Symfony\Component\Clock\Test\ClockSensitiveTrait; +use Symfony\Component\HttpKernel\KernelInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\DelayStamp; +use Zenstruck\Messenger\Test\InteractsWithMessenger; +use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA; +use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageB; +use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageC; + +/** + * @author Nicolas PHILIPPE + */ +final class DelayStampTestTest extends WebTestCase +{ + use InteractsWithMessenger; + use ClockSensitiveTrait; + + /** + * @test + * @group legacy + */ + public function it_handles_messages_sequentially_without_delay_stamp_support(): void + { + self::bootKernel(['environment' => 'delay_stamp_disabled']); + + $transport = $this->transport('async'); + $transport->send(new Envelope(new MessageA(), [new DelayStamp(10_000)])); + $transport->send(new Envelope(new MessageB())); + $transport->send(new Envelope(new MessageC(), [new DelayStamp(5_000)])); + + $transport->acknowledged()->assertCount(0); + + $transport->process(1)->acknowledged()->assertCount(1)->assertContains(MessageA::class); + $transport->process(1)->acknowledged()->assertCount(2)->assertContains(MessageB::class); + $transport->process(1)->acknowledged()->assertCount(3)->assertContains(MessageC::class); + } + + /** + * @test + */ + public function it_only_handles_message_without_delay_stamp_if_clock_not_mocked(): void + { + $transport = $this->transport('async'); + $transport->send(new Envelope(new MessageA(), [new DelayStamp(10_000)])); + $transport->send(new Envelope(new MessageB())); + $transport->send(new Envelope(new MessageC(), [new DelayStamp(5_000)])); + + $transport->acknowledged()->assertCount(0); + + $transport->process(1)->acknowledged()->assertCount(1)->assertContains(MessageB::class); + $transport->process()->acknowledged()->assertCount(1); + } + + /** + * @test + */ + public function it_handles_messages_depending_on_delay_stamp(): void + { + $clock = self::mockTime(); + + $transport = $this->transport('async'); + $transport->send(new Envelope(new MessageA(), [new DelayStamp(10_000)])); + $transport->send(new Envelope(new MessageB())); + $transport->send(new Envelope(new MessageC(), [new DelayStamp(5_000)])); + + $transport->acknowledged()->assertCount(0); + + $transport->process()->acknowledged()->assertCount(1)->assertContains(MessageB::class); + + $clock->sleep(5); + $transport->process()->acknowledged()->assertCount(2)->assertContains(MessageC::class); + + $clock->sleep(5); + $transport->process()->acknowledged()->assertCount(3)->assertContains(MessageA::class); + } + + protected static function bootKernel(array $options = []): KernelInterface // @phpstan-ignore-line + { + return parent::bootKernel(\array_merge(['environment' => 'single_transport'], $options)); + } +} diff --git a/tests/Fixture/config/default_sync_transport.yaml b/tests/Fixture/config/default_sync_transport.yaml index ee9175d..dfb083b 100644 --- a/tests/Fixture/config/default_sync_transport.yaml +++ b/tests/Fixture/config/default_sync_transport.yaml @@ -5,4 +5,4 @@ framework: messenger: transports: sync: - dsn: sync:// + dsn: sync://?support_delay_stamp=true diff --git a/tests/Fixture/config/delay_stamp_disabled.yaml b/tests/Fixture/config/delay_stamp_disabled.yaml new file mode 100644 index 0000000..ab6e15f --- /dev/null +++ b/tests/Fixture/config/delay_stamp_disabled.yaml @@ -0,0 +1,12 @@ +imports: + - { resource: test.yaml } + +framework: + messenger: + transports: + async: + dsn: test://?disable_retries=false&support_delay_stamp=false + routing: + Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA: [async] + Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageB: [async] + Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageC: [async] diff --git a/tests/Fixture/config/multi_bus.yaml b/tests/Fixture/config/multi_bus.yaml index 7d49e72..5ca9b7a 100644 --- a/tests/Fixture/config/multi_bus.yaml +++ b/tests/Fixture/config/multi_bus.yaml @@ -13,7 +13,7 @@ framework: messenger: transports: async: - dsn: test://?disable_retries=false + dsn: test://?disable_retries=false&support_delay_stamp=true routing: Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA: async default_bus: bus_c diff --git a/tests/Fixture/config/multi_transport.yaml b/tests/Fixture/config/multi_transport.yaml index 83f446c..704cb27 100644 --- a/tests/Fixture/config/multi_transport.yaml +++ b/tests/Fixture/config/multi_transport.yaml @@ -5,12 +5,12 @@ framework: messenger: transports: async1: - dsn: test:// + dsn: test://?support_delay_stamp=true async2: - dsn: test://?intercept=false&catch_exceptions=false&test_serialization=false + dsn: test://?intercept=false&catch_exceptions=false&test_serialization=false&support_delay_stamp=true async3: in-memory:// async4: - dsn: test://?disable_retries=false + dsn: test://?disable_retries=false&support_delay_stamp=true routing: Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA: [async1, async4] Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageB: [async2] diff --git a/tests/Fixture/config/single_transport.yaml b/tests/Fixture/config/single_transport.yaml index 3c6b212..ccb7686 100644 --- a/tests/Fixture/config/single_transport.yaml +++ b/tests/Fixture/config/single_transport.yaml @@ -5,7 +5,7 @@ framework: messenger: transports: async: - dsn: test:// + dsn: test://?support_delay_stamp=true routing: Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA: [async] Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageB: [async] diff --git a/tests/InteractsWithBusTest.php b/tests/InteractsWithBusTest.php index d30ce72..15f6622 100644 --- a/tests/InteractsWithBusTest.php +++ b/tests/InteractsWithBusTest.php @@ -62,16 +62,10 @@ public function interacts_with_specified_bus(): void $this->bus('bus_b')->dispatched()->assertEmpty(); $this->bus('bus_c')->dispatched()->assertEmpty(); - self::getContainer()->get('bus_a')->dispatch(new MessageA(fail: true)); + self::getContainer()->get('bus_a')->dispatch(new MessageA()); self::getContainer()->get('bus_b')->dispatch(new MessageB()); self::getContainer()->get('bus_c')->dispatch(new MessageC()); - $this->transport() - ->process() - ->rejected() - ->assertContains(MessageA::class, 4) - ; - $this->bus('bus_a')->dispatched()->assertCount(1); $this->bus('bus_b')->dispatched()->assertCount(1); $this->bus('bus_c')->dispatched()->assertCount(1); diff --git a/tests/InteractsWithMessengerTest.php b/tests/InteractsWithMessengerTest.php index 790b790..1dbe2de 100644 --- a/tests/InteractsWithMessengerTest.php +++ b/tests/InteractsWithMessengerTest.php @@ -13,6 +13,7 @@ use PHPUnit\Framework\AssertionFailedError; use Symfony\Bundle\FrameworkBundle\Test\WebTestCase; +use Symfony\Component\Clock\Test\ClockSensitiveTrait; use Symfony\Component\HttpKernel\KernelInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; @@ -42,6 +43,7 @@ final class InteractsWithMessengerTest extends WebTestCase { use InteractsWithMessenger; + use ClockSensitiveTrait; /** * @test @@ -870,15 +872,35 @@ public function serialization_problem_assertions(): void */ public function can_enable_retries(): void { + $clock = self::mockTime(); + self::bootKernel(['environment' => 'multi_transport']); self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true)); - $this->transport('async4') - ->process() - ->rejected() - ->assertContains(MessageA::class, 4) - ; + $this->transport('async4')->process()->rejected()->assertContains(MessageA::class, 1); + + $clock->sleep(1); + $this->transport('async4')->process()->rejected()->assertContains(MessageA::class, 2); + + $clock->sleep(2); + $this->transport('async4')->process()->rejected()->assertContains(MessageA::class, 3); + + $clock->sleep(4); + $this->transport('async4')->process()->rejected()->assertContains(MessageA::class, 4); + } + + /** + * @test + * @group legacy + */ + public function can_enable_retries_without_delay_stamp(): void + { + self::bootKernel(['environment' => 'delay_stamp_disabled']); + + self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true)); + + $this->transport('async')->process()->rejected()->assertContains(MessageA::class, 4); } /**