From 91096ef01c8d5509516616ea091c289c7b81d912 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Zaj=C4=85c?= Date: Thu, 1 Jun 2023 22:16:48 +0200 Subject: [PATCH] remove headers once they are propagated (#134) * add test scenario using Ecotone Lite * remove the headers once they are propagated in case of asynchronous projection when first event has a header which does not exist in the next one, header will remain * Add header propagation test for multiple events * add asynchronous passing test * add asynchronous passing test * Add new tests and solution to protect from propagated headers * removing already propagated headers works for async handlers. need to find a way to keep them when message is handled synchronously * Stream based source * enable testing async scenarios --------- Co-authored-by: Dariusz Gafka Co-authored-by: Dariusz Gafka --- .../Ecotone/src/Messaging/MessageHeaders.php | 24 ++++ .../MessageHeadersPropagatorInterceptor.php | 19 ++- .../src/Modelling/SaveAggregateService.php | 6 +- .../MessagingTestSupportFrameworkTest.php | 2 +- .../MetadataPropagating/OrderService.php | 17 ++- .../OrderService.php | 69 +++++++++++ .../OrderWasPlaced.php | 7 ++ .../Unit/MetadataPropagatingTest.php | 61 ++++++++++ .../src/ChannelProjectionExecutor.php | 2 + .../NotificationService.php | 38 ++++++ .../Order.php | 32 +++++ .../OrderCreated.php | 15 +++ .../OrderEventsConverter.php | 34 ++++++ .../OrderProjection.php | 56 +++++++++ .../ProductAddedToOrder.php | 15 +++ .../AggregateAndProjectionTriggerTest.php | 43 +++++++ .../ProjectionMetadataPropagationTest.php | 113 ++++++++++++++++++ 17 files changed, 533 insertions(+), 20 deletions(-) create mode 100644 packages/Ecotone/tests/Modelling/Fixture/MetadataPropagatingWithDoubleEventHandlers/OrderService.php create mode 100644 packages/Ecotone/tests/Modelling/Fixture/MetadataPropagatingWithDoubleEventHandlers/OrderWasPlaced.php create mode 100644 packages/Ecotone/tests/Modelling/Unit/MetadataPropagatingTest.php create mode 100644 packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/NotificationService.php create mode 100644 packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/Order.php create mode 100644 packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderCreated.php create mode 100644 packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderEventsConverter.php create mode 100644 packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderProjection.php create mode 100644 packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/ProductAddedToOrder.php create mode 100644 packages/PdoEventSourcing/tests/Integration/AggregateAndProjectionTriggerTest.php create mode 100644 packages/PdoEventSourcing/tests/Integration/ProjectionMetadataPropagationTest.php diff --git a/packages/Ecotone/src/Messaging/MessageHeaders.php b/packages/Ecotone/src/Messaging/MessageHeaders.php index cacc6ba97..691a389cf 100644 --- a/packages/Ecotone/src/Messaging/MessageHeaders.php +++ b/packages/Ecotone/src/Messaging/MessageHeaders.php @@ -3,6 +3,7 @@ namespace Ecotone\Messaging; use Ecotone\Messaging\Conversion\MediaType; +use Ecotone\Messaging\Gateway\MessagingEntrypoint; use Ecotone\Messaging\Handler\TypeDescriptor; use Ecotone\Modelling\AggregateMessage; @@ -105,6 +106,8 @@ final class MessageHeaders */ public const REVISION = 'revision'; + public const STREAM_BASED_SOURCED = 'streamBasedSourced'; + private array $headers; /** @@ -160,9 +163,20 @@ public static function getFrameworksHeaderNames(): array self::POLLED_CHANNEL_NAME, self::REPLY_CONTENT_TYPE, self::CONSUMER_ENDPOINT_ID, + self::STREAM_BASED_SOURCED, + MessagingEntrypoint::ENTRYPOINT, ]; } + public static function unsetFrameworkKeys(array $metadata): array + { + foreach (self::getFrameworksHeaderNames() as $frameworksHeaderName) { + unset($metadata[$frameworksHeaderName]); + } + + return $metadata; + } + public static function unsetTransportMessageKeys(array $metadata): array { unset($metadata[self::MESSAGE_ID]); @@ -233,6 +247,16 @@ public static function unsetAggregateKeys(array $metadata): array return $metadata; } + public static function unsetNonUserKeys(array $metadata): array + { + $metadata = self::unsetEnqueueMetadata($metadata); + $metadata = self::unsetDistributionKeys($metadata); + $metadata = self::unsetAsyncKeys($metadata); + $metadata = self::unsetBusKeys($metadata); + + return self::unsetAggregateKeys($metadata); + } + /** * @param string $headerRegex e.g. ecotone-domain-* * diff --git a/packages/Ecotone/src/Modelling/MessageHandling/MetadataPropagator/MessageHeadersPropagatorInterceptor.php b/packages/Ecotone/src/Modelling/MessageHandling/MetadataPropagator/MessageHeadersPropagatorInterceptor.php index ceb5af25d..bf37a2d31 100644 --- a/packages/Ecotone/src/Modelling/MessageHandling/MetadataPropagator/MessageHeadersPropagatorInterceptor.php +++ b/packages/Ecotone/src/Modelling/MessageHandling/MetadataPropagator/MessageHeadersPropagatorInterceptor.php @@ -5,7 +5,6 @@ use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation; use Ecotone\Messaging\Message; use Ecotone\Messaging\MessageHeaders; -use Throwable; class MessageHeadersPropagatorInterceptor { @@ -14,23 +13,15 @@ class MessageHeadersPropagatorInterceptor public function storeHeaders(MethodInvocation $methodInvocation, Message $message) { $headers = $message->getHeaders()->headers(); - foreach (MessageHeaders::getFrameworksHeaderNames() as $frameworksHeaderName) { - unset($headers[$frameworksHeaderName]); - } - if (isset($headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION])) { - unset($headers[$headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION]]); - } - unset($headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION]); + $headers = MessageHeaders::unsetFrameworkKeys($headers); + $headers = MessageHeaders::unsetNonUserKeys($headers); $this->currentlyPropagatedHeaders[] = $headers; try { $reply = $methodInvocation->proceed(); + } finally { array_shift($this->currentlyPropagatedHeaders); - } catch (Throwable $exception) { - array_shift($this->currentlyPropagatedHeaders); - - throw $exception; } return $reply; @@ -38,6 +29,10 @@ public function storeHeaders(MethodInvocation $methodInvocation, Message $messag public function propagateHeaders(array $headers): array { + if (array_key_exists(MessageHeaders::STREAM_BASED_SOURCED, $headers) && $headers[MessageHeaders::STREAM_BASED_SOURCED]) { + return $headers; + } + return array_merge($this->getLastHeaders(), $headers); } diff --git a/packages/Ecotone/src/Modelling/SaveAggregateService.php b/packages/Ecotone/src/Modelling/SaveAggregateService.php index 913ba60a5..34f43c469 100644 --- a/packages/Ecotone/src/Modelling/SaveAggregateService.php +++ b/packages/Ecotone/src/Modelling/SaveAggregateService.php @@ -75,11 +75,7 @@ public function __construct( public function save(Message $message, array $metadata): Message { - $metadata = MessageHeaders::unsetEnqueueMetadata($metadata); - $metadata = MessageHeaders::unsetDistributionKeys($metadata); - $metadata = MessageHeaders::unsetAsyncKeys($metadata); - $metadata = MessageHeaders::unsetBusKeys($metadata); - $metadata = MessageHeaders::unsetAggregateKeys($metadata); + $metadata = MessageHeaders::unsetNonUserKeys($metadata); $aggregate = $message->getHeaders()->get(AggregateMessage::AGGREGATE_OBJECT); $events = []; diff --git a/packages/Ecotone/tests/Lite/Test/MessagingTestSupportFrameworkTest.php b/packages/Ecotone/tests/Lite/Test/MessagingTestSupportFrameworkTest.php index dd4682ea6..38f81457e 100644 --- a/packages/Ecotone/tests/Lite/Test/MessagingTestSupportFrameworkTest.php +++ b/packages/Ecotone/tests/Lite/Test/MessagingTestSupportFrameworkTest.php @@ -294,7 +294,7 @@ public function test_collecting_sent_commands() $testSupportGateway = $ecotoneTestSupport->getMessagingTestSupport(); - $this->assertEquals([[]], $testSupportGateway->getRecordedCommands()); + $this->assertEquals([[], []], $testSupportGateway->getRecordedCommands()); $this->assertEmpty($testSupportGateway->getRecordedCommands()); } diff --git a/packages/Ecotone/tests/Modelling/Fixture/MetadataPropagating/OrderService.php b/packages/Ecotone/tests/Modelling/Fixture/MetadataPropagating/OrderService.php index f45efeda8..200e6e469 100644 --- a/packages/Ecotone/tests/Modelling/Fixture/MetadataPropagating/OrderService.php +++ b/packages/Ecotone/tests/Modelling/Fixture/MetadataPropagating/OrderService.php @@ -2,6 +2,7 @@ namespace Test\Ecotone\Modelling\Fixture\MetadataPropagating; +use Ecotone\Messaging\Attribute\Asynchronous; use Ecotone\Messaging\Conversion\MediaType; use Ecotone\Modelling\Attribute\CommandHandler; use Ecotone\Modelling\Attribute\EventHandler; @@ -29,7 +30,13 @@ public function failAction(): void } #[EventHandler] - public function notify(OrderWasPlaced $event, array $headers, CommandBus $commandBus): void + public function notifyOne(OrderWasPlaced $event, array $headers, CommandBus $commandBus): void + { + $commandBus->sendWithRouting('sendNotification', [], MediaType::APPLICATION_X_PHP_ARRAY, $this->notifyWithCustomHeaders); + } + + #[EventHandler] + public function notifyTwo(OrderWasPlaced $event, array $headers, CommandBus $commandBus): void { $commandBus->sendWithRouting('sendNotification', [], MediaType::APPLICATION_X_PHP_ARRAY, $this->notifyWithCustomHeaders); } @@ -43,11 +50,17 @@ public function notifyWithCustomerHeaders(array $payload, array $headers): void #[CommandHandler('sendNotification')] public function sendNotification($command, array $headers): void { - $this->notificationHeaders = $headers; + $this->notificationHeaders[] = $headers; } #[QueryHandler('getNotificationHeaders')] public function getNotificationHeaders(): array + { + return \end($this->notificationHeaders); + } + + #[QueryHandler('getAllNotificationHeaders')] + public function getAllNotificationHeaders(): array { return $this->notificationHeaders; } diff --git a/packages/Ecotone/tests/Modelling/Fixture/MetadataPropagatingWithDoubleEventHandlers/OrderService.php b/packages/Ecotone/tests/Modelling/Fixture/MetadataPropagatingWithDoubleEventHandlers/OrderService.php new file mode 100644 index 000000000..1e10b59f2 --- /dev/null +++ b/packages/Ecotone/tests/Modelling/Fixture/MetadataPropagatingWithDoubleEventHandlers/OrderService.php @@ -0,0 +1,69 @@ +publish(new OrderWasPlaced()); + } + + #[CommandHandler('failAction')] + public function failAction(): void + { + throw new InvalidArgumentException('failed action'); + } + + #[Asynchronous("orders")] + #[EventHandler(endpointId: 'notifyOne')] + public function notifyOne(OrderWasPlaced $event, array $headers, CommandBus $commandBus): void + { + $commandBus->sendWithRouting('sendNotification', [], MediaType::APPLICATION_X_PHP_ARRAY, $this->notifyWithCustomHeaders); + } + + #[Asynchronous('orders')] + #[EventHandler(endpointId: 'notifyTwo')] + public function notifyTwo(OrderWasPlaced $event, array $headers, CommandBus $commandBus): void + { + $commandBus->sendWithRouting('sendNotification', [], MediaType::APPLICATION_X_PHP_ARRAY, $this->notifyWithCustomHeaders); + } + + #[CommandHandler('setCustomNotificationHeaders')] + public function notifyWithCustomerHeaders(array $payload, array $headers): void + { + $this->notifyWithCustomHeaders = $headers; + } + + #[CommandHandler('sendNotification')] + public function sendNotification($command, array $headers): void + { + $this->notificationHeaders[] = $headers; + } + + #[QueryHandler('getNotificationHeaders')] + public function getNotificationHeaders(): array + { + return \end($this->notificationHeaders); + } + + #[QueryHandler('getAllNotificationHeaders')] + public function getAllNotificationHeaders(): array + { + return $this->notificationHeaders; + } +} diff --git a/packages/Ecotone/tests/Modelling/Fixture/MetadataPropagatingWithDoubleEventHandlers/OrderWasPlaced.php b/packages/Ecotone/tests/Modelling/Fixture/MetadataPropagatingWithDoubleEventHandlers/OrderWasPlaced.php new file mode 100644 index 000000000..7332e4d97 --- /dev/null +++ b/packages/Ecotone/tests/Modelling/Fixture/MetadataPropagatingWithDoubleEventHandlers/OrderWasPlaced.php @@ -0,0 +1,7 @@ +sendCommandWithRoutingKey( + 'placeOrder', + metadata: [ + 'userId' => '123' + ] + ); + + $notifications = $ecotoneTestSupport->sendQueryWithRouting('getAllNotificationHeaders'); + $this->assertCount(2, $notifications); + $this->assertEquals('123', $notifications[0]['userId']); + $this->assertEquals('123', $notifications[1]['userId']); + } + + public function test_propagating_headers_to_all_published_asynchronous_event_handlers(): void + { + $ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting( + classesToResolve: [OrderService::class], + containerOrAvailableServices: [new OrderService()], + configuration: ServiceConfiguration::createWithAsynchronicityOnly() + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel('orders') + ]) + ); + + $ecotoneTestSupport->sendCommandWithRoutingKey( + 'placeOrder', + metadata: [ + 'userId' => '123' + ] + ); + + $ecotoneTestSupport->run('orders', ExecutionPollingMetadata::createWithTestingSetup(2)); + $notifications = $ecotoneTestSupport->sendQueryWithRouting('getAllNotificationHeaders'); + + $this->assertCount(2, $notifications); + $this->assertEquals('123', $notifications[0]['userId']); + $this->assertEquals('123', $notifications[1]['userId']); + } +} diff --git a/packages/PdoEventSourcing/src/ChannelProjectionExecutor.php b/packages/PdoEventSourcing/src/ChannelProjectionExecutor.php index b56547c37..e9d9b21d5 100644 --- a/packages/PdoEventSourcing/src/ChannelProjectionExecutor.php +++ b/packages/PdoEventSourcing/src/ChannelProjectionExecutor.php @@ -7,6 +7,7 @@ use Ecotone\Messaging\Conversion\MediaType; use Ecotone\Messaging\Gateway\MessagingEntrypoint; use Ecotone\Messaging\Handler\TypeDescriptor; +use Ecotone\Messaging\MessageHeaders; use Ecotone\Modelling\Event; final class ChannelProjectionExecutor implements ProjectionExecutor @@ -31,6 +32,7 @@ public function executeWith(string $eventName, Event $event, ?array $state = nul ProjectionEventHandler::PROJECTION_IS_REBUILDING => $this->projectionStatus == ProjectionStatus::REBUILDING(), ProjectionEventHandler::PROJECTION_NAME => $this->projectionSetupConfiguration->getProjectionName(), ProjectionEventHandler::PROJECTION_IS_POLLING => true, + MessageHeaders::STREAM_BASED_SOURCED => true ] ), $projectionEventHandler->getSynchronousRequestChannelName() diff --git a/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/NotificationService.php b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/NotificationService.php new file mode 100644 index 000000000..1852b816e --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/NotificationService.php @@ -0,0 +1,38 @@ +fooCounter++; + } + } + + #[Asynchronous(OrderProjection::CHANNEL)] + #[EventHandler(endpointId: 'notification_service.another_order_created')] + public function another(OrderCreated $event, array $metadata): void + { + if (array_key_exists('foo', $metadata)) { + $this->fooCounter++; + } + } + + #[QueryHandler("getNotificationCountWithFoo")] + public function getNotificationCountWithFoo(): int + { + return $this->fooCounter; + } +} diff --git a/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/Order.php b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/Order.php new file mode 100644 index 000000000..b6f1f0e20 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/Order.php @@ -0,0 +1,32 @@ +id = $event->id; + } +} diff --git a/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderCreated.php b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderCreated.php new file mode 100644 index 000000000..e08152427 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderCreated.php @@ -0,0 +1,15 @@ + $event->id]; + } + + #[Converter] + public function convertToOrderCreated(array $payload): OrderCreated + { + return new OrderCreated($payload['id']); + } + + #[Converter] + public function convertFromProductAddedToOrder(ProductAddedToOrder $event): array + { + return ['id' => $event->id]; + } + + #[Converter] + public function convertToProductAddedToOrder(array $payload): ProductAddedToOrder + { + return new ProductAddedToOrder($payload['id']); + } +} diff --git a/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderProjection.php b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderProjection.php new file mode 100644 index 000000000..37319eb69 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/OrderProjection.php @@ -0,0 +1,56 @@ +connection->executeStatement(sprintf('CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY, foo INT DEFAULT 0)', self::TABLE)); + } + + #[EventHandler(listenTo: 'order.created', endpointId: 'foo_orders.order_created')] + public function whenOrderCreated(OrderCreated $event, array $metadata): void + { + $data = ['id' => $event->id]; + if (array_key_exists('foo', $metadata)) { + $data['foo'] = 1; + } + + $this->connection->insert(self::TABLE, $data); + } + + #[EventHandler(listenTo: 'order.product_added', endpointId: 'foo_orders.product_added')] + public function whenProductAddedToOrder(ProductAddedToOrder $event, array $metadata): void + { + if (array_key_exists('foo', $metadata)) { + $this->connection->executeStatement(sprintf('UPDATE %s SET foo = foo + 1 WHERE id = ?', self::TABLE), [$event->id]); + } + } + + #[QueryHandler(routingKey: 'foo_orders.count')] + public function fooOrdersCount(): int + { + return (int) $this->connection->fetchOne(sprintf('SELECT SUM(foo) FROM %s', self::TABLE)); + } +} diff --git a/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/ProductAddedToOrder.php b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/ProductAddedToOrder.php new file mode 100644 index 000000000..e4e178d74 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Fixture/MetadataPropagationWithAsyncProjection/ProductAddedToOrder.php @@ -0,0 +1,15 @@ + $this->getConnectionFactory()], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::DBAL_PACKAGE])) + ->withNamespaces(['Test\Ecotone\EventSourcing\Fixture\Ticket', 'Test\Ecotone\EventSourcing\Fixture\TicketProjectionState']), + pathToRootCatalog: __DIR__ . "/../../", + addEventSourcedRepository: false + ); + + $this->assertEquals( + 1, + $ecotoneLite + ->sendCommand(new RegisterTicket('123', 'johny', 'alert')) + ->sendCommand(new CloseTicket('123')) + ->getGateway(CounterStateGateway::class) + ->fetchState() + ->closedTicketCount + ); + } +} \ No newline at end of file diff --git a/packages/PdoEventSourcing/tests/Integration/ProjectionMetadataPropagationTest.php b/packages/PdoEventSourcing/tests/Integration/ProjectionMetadataPropagationTest.php new file mode 100644 index 000000000..348bc1697 --- /dev/null +++ b/packages/PdoEventSourcing/tests/Integration/ProjectionMetadataPropagationTest.php @@ -0,0 +1,113 @@ +markTestSkipped('This fails and it seems it reproduces: https://github.com/ecotoneframework/ecotone-dev/issues/104'); + + $ecotoneLite = $this->getBootstrapFlowTesting( + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::DBAL_PACKAGE])) + ->withNamespaces(['Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection']) + ); + + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 1, metadata: ['foo' => 'bar', 'eventId' => 1]); + self::assertEquals(expected: 2, actual: $ecotoneLite->sendQueryWithRouting('foo_orders.count')); + self::assertEquals(expected: 2, actual: $ecotoneLite->sendQueryWithRouting('getNotificationCountWithFoo')); + + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 2, metadata: ['eventId' => 2]); + self::assertEquals(expected: 2, actual: $ecotoneLite->sendQueryWithRouting('foo_orders.count')); + self::assertEquals(expected: 2, actual: $ecotoneLite->sendQueryWithRouting('getNotificationCountWithFoo')); + + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 3, metadata: ['foo' => 'baz', 'eventId' => 3]); + self::assertEquals(expected: 4, actual: $ecotoneLite->sendQueryWithRouting('foo_orders.count')); + self::assertEquals(expected: 4, actual: $ecotoneLite->sendQueryWithRouting('getNotificationCountWithFoo')); + } + + public function test_metadata_propagation_with_async_projection_when_catching_up(): void + { + $ecotoneLite = $this->getBootstrapFlowTesting( + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withNamespaces(['Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection']) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(OrderProjection::CHANNEL) + ]) + ); + + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 1, metadata: ['foo' => 'bar', 'eventId' => 1]); + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 2, metadata: ['eventId' => 2]); + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 3, metadata: ['foo' => 'baz', 'eventId' => 3]); + + $ecotoneLite->run(name: OrderProjection::CHANNEL, executionPollingMetadata: ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 10, maxExecutionTimeInMilliseconds: 1000)); + + self::assertEquals(expected: 4, actual: $ecotoneLite->sendQueryWithRouting('foo_orders.count')); + self::assertEquals(expected: 4, actual: $ecotoneLite->sendQueryWithRouting('getNotificationCountWithFoo')); + } + + public function test_metadata_propagation_with_async_projection_when_populated_dynamically(): void + { + $ecotoneLite = $this->getBootstrapFlowTesting( + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withNamespaces(['Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection']) + ->withExtensionObjects([ + SimpleMessageChannelBuilder::createQueueChannel(OrderProjection::CHANNEL) + ]) + ); + + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 1, metadata: ['foo' => 'bar']); + $ecotoneLite->run(name: OrderProjection::CHANNEL, executionPollingMetadata: ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 4, maxExecutionTimeInMilliseconds: 1000)); + + self::assertEquals(expected: 2, actual: $ecotoneLite->sendQueryWithRouting('foo_orders.count')); + self::assertEquals(expected: 2, actual: $ecotoneLite->sendQueryWithRouting('getNotificationCountWithFoo')); + + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 2); + $ecotoneLite->run(name: OrderProjection::CHANNEL, executionPollingMetadata: ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 4, maxExecutionTimeInMilliseconds: 1000)); + + self::assertEquals(expected: 2, actual: $ecotoneLite->sendQueryWithRouting('foo_orders.count')); + self::assertEquals(expected: 2, actual: $ecotoneLite->sendQueryWithRouting('getNotificationCountWithFoo')); + + $ecotoneLite->sendCommandWithRoutingKey(routingKey: 'order.create', command: 3, metadata: ['foo' => 'baz']); + $ecotoneLite->run(name: OrderProjection::CHANNEL, executionPollingMetadata: ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 4, maxExecutionTimeInMilliseconds: 1000)); + + self::assertEquals(expected: 4, actual: $ecotoneLite->sendQueryWithRouting('foo_orders.count')); + self::assertEquals(expected: 4, actual: $ecotoneLite->sendQueryWithRouting('getNotificationCountWithFoo')); + } + + private function getBootstrapFlowTesting(ServiceConfiguration $serviceConfiguration): FlowTestSupport + { + /** @var DbalConnectionFactory $connectionFactory */ + $connectionFactory = $this->getConnectionFactory(); + $connection = $connectionFactory->createContext()->getDbalConnection(); + $schemaManager = $connection->createSchemaManager(); + if ($schemaManager->tablesExist(names: OrderProjection::TABLE)) { + $connection->delete(OrderProjection::TABLE, ['1' => '1']); + } + + return EcotoneLite::bootstrapFlowTesting( + containerOrAvailableServices: [new OrderProjection($connection), new OrderEventsConverter(), new NotificationService(), DbalConnectionFactory::class => $connectionFactory], + configuration: $serviceConfiguration, + pathToRootCatalog: __DIR__ . "/../../", + addEventSourcedRepository: false + ); + } +}