Skip to content

Commit

Permalink
remove headers once they are propagated (#134)
Browse files Browse the repository at this point in the history
* add test scenario using Ecotone Lite

* remove the headers once they are propagated
in case of asynchronous projection when first event has a header which does not exist in the next one, header will remain

* Add header propagation test for multiple events

* add asynchronous passing test

* add asynchronous passing test

* Add new tests and solution to protect from propagated headers

* removing already propagated headers works for async handlers. need to find a way to keep them when message is handled synchronously

* Stream based source

* enable testing async scenarios

---------

Co-authored-by: Dariusz Gafka <dariuszgafka@gmail.com>
Co-authored-by: Dariusz Gafka <dgafka.mail@gmail.com>
  • Loading branch information
3 people authored Jun 1, 2023
1 parent e100ba0 commit 91096ef
Show file tree
Hide file tree
Showing 17 changed files with 533 additions and 20 deletions.
24 changes: 24 additions & 0 deletions packages/Ecotone/src/Messaging/MessageHeaders.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Ecotone\Messaging;

use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
use Ecotone\Messaging\Handler\TypeDescriptor;

use Ecotone\Modelling\AggregateMessage;
Expand Down Expand Up @@ -105,6 +106,8 @@ final class MessageHeaders
*/
public const REVISION = 'revision';

public const STREAM_BASED_SOURCED = 'streamBasedSourced';

private array $headers;

/**
Expand Down Expand Up @@ -160,9 +163,20 @@ public static function getFrameworksHeaderNames(): array
self::POLLED_CHANNEL_NAME,
self::REPLY_CONTENT_TYPE,
self::CONSUMER_ENDPOINT_ID,
self::STREAM_BASED_SOURCED,
MessagingEntrypoint::ENTRYPOINT,
];
}

public static function unsetFrameworkKeys(array $metadata): array
{
foreach (self::getFrameworksHeaderNames() as $frameworksHeaderName) {
unset($metadata[$frameworksHeaderName]);
}

return $metadata;
}

public static function unsetTransportMessageKeys(array $metadata): array
{
unset($metadata[self::MESSAGE_ID]);
Expand Down Expand Up @@ -233,6 +247,16 @@ public static function unsetAggregateKeys(array $metadata): array
return $metadata;
}

public static function unsetNonUserKeys(array $metadata): array
{
$metadata = self::unsetEnqueueMetadata($metadata);
$metadata = self::unsetDistributionKeys($metadata);
$metadata = self::unsetAsyncKeys($metadata);
$metadata = self::unsetBusKeys($metadata);

return self::unsetAggregateKeys($metadata);
}

/**
* @param string $headerRegex e.g. ecotone-domain-*
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageHeaders;
use Throwable;

class MessageHeadersPropagatorInterceptor
{
Expand All @@ -14,30 +13,26 @@ class MessageHeadersPropagatorInterceptor
public function storeHeaders(MethodInvocation $methodInvocation, Message $message)
{
$headers = $message->getHeaders()->headers();
foreach (MessageHeaders::getFrameworksHeaderNames() as $frameworksHeaderName) {
unset($headers[$frameworksHeaderName]);
}
if (isset($headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION])) {
unset($headers[$headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION]]);
}
unset($headers[MessageHeaders::CONSUMER_ACK_HEADER_LOCATION]);
$headers = MessageHeaders::unsetFrameworkKeys($headers);
$headers = MessageHeaders::unsetNonUserKeys($headers);

$this->currentlyPropagatedHeaders[] = $headers;

try {
$reply = $methodInvocation->proceed();
} finally {
array_shift($this->currentlyPropagatedHeaders);
} catch (Throwable $exception) {
array_shift($this->currentlyPropagatedHeaders);

throw $exception;
}

return $reply;
}

public function propagateHeaders(array $headers): array
{
if (array_key_exists(MessageHeaders::STREAM_BASED_SOURCED, $headers) && $headers[MessageHeaders::STREAM_BASED_SOURCED]) {
return $headers;
}

return array_merge($this->getLastHeaders(), $headers);
}

Expand Down
6 changes: 1 addition & 5 deletions packages/Ecotone/src/Modelling/SaveAggregateService.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@ public function __construct(

public function save(Message $message, array $metadata): Message
{
$metadata = MessageHeaders::unsetEnqueueMetadata($metadata);
$metadata = MessageHeaders::unsetDistributionKeys($metadata);
$metadata = MessageHeaders::unsetAsyncKeys($metadata);
$metadata = MessageHeaders::unsetBusKeys($metadata);
$metadata = MessageHeaders::unsetAggregateKeys($metadata);
$metadata = MessageHeaders::unsetNonUserKeys($metadata);

$aggregate = $message->getHeaders()->get(AggregateMessage::AGGREGATE_OBJECT);
$events = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public function test_collecting_sent_commands()

$testSupportGateway = $ecotoneTestSupport->getMessagingTestSupport();

$this->assertEquals([[]], $testSupportGateway->getRecordedCommands());
$this->assertEquals([[], []], $testSupportGateway->getRecordedCommands());
$this->assertEmpty($testSupportGateway->getRecordedCommands());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Test\Ecotone\Modelling\Fixture\MetadataPropagating;

use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\EventHandler;
Expand Down Expand Up @@ -29,7 +30,13 @@ public function failAction(): void
}

#[EventHandler]
public function notify(OrderWasPlaced $event, array $headers, CommandBus $commandBus): void
public function notifyOne(OrderWasPlaced $event, array $headers, CommandBus $commandBus): void
{
$commandBus->sendWithRouting('sendNotification', [], MediaType::APPLICATION_X_PHP_ARRAY, $this->notifyWithCustomHeaders);
}

#[EventHandler]
public function notifyTwo(OrderWasPlaced $event, array $headers, CommandBus $commandBus): void
{
$commandBus->sendWithRouting('sendNotification', [], MediaType::APPLICATION_X_PHP_ARRAY, $this->notifyWithCustomHeaders);
}
Expand All @@ -43,11 +50,17 @@ public function notifyWithCustomerHeaders(array $payload, array $headers): void
#[CommandHandler('sendNotification')]
public function sendNotification($command, array $headers): void
{
$this->notificationHeaders = $headers;
$this->notificationHeaders[] = $headers;
}

#[QueryHandler('getNotificationHeaders')]
public function getNotificationHeaders(): array
{
return \end($this->notificationHeaders);
}

#[QueryHandler('getAllNotificationHeaders')]
public function getAllNotificationHeaders(): array
{
return $this->notificationHeaders;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

namespace Test\Ecotone\Modelling\Fixture\MetadataPropagatingWithDoubleEventHandlers;

use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\EventBus;
use InvalidArgumentException;

class OrderService
{
private array $notificationHeaders = [];

private array $notifyWithCustomHeaders = [];

#[CommandHandler('placeOrder')]
public function doSomething($command, array $headers, EventBus $eventBus): void
{
$eventBus->publish(new OrderWasPlaced());
}

#[CommandHandler('failAction')]
public function failAction(): void
{
throw new InvalidArgumentException('failed action');
}

#[Asynchronous("orders")]
#[EventHandler(endpointId: 'notifyOne')]
public function notifyOne(OrderWasPlaced $event, array $headers, CommandBus $commandBus): void
{
$commandBus->sendWithRouting('sendNotification', [], MediaType::APPLICATION_X_PHP_ARRAY, $this->notifyWithCustomHeaders);
}

#[Asynchronous('orders')]
#[EventHandler(endpointId: 'notifyTwo')]
public function notifyTwo(OrderWasPlaced $event, array $headers, CommandBus $commandBus): void
{
$commandBus->sendWithRouting('sendNotification', [], MediaType::APPLICATION_X_PHP_ARRAY, $this->notifyWithCustomHeaders);
}

#[CommandHandler('setCustomNotificationHeaders')]
public function notifyWithCustomerHeaders(array $payload, array $headers): void
{
$this->notifyWithCustomHeaders = $headers;
}

#[CommandHandler('sendNotification')]
public function sendNotification($command, array $headers): void
{
$this->notificationHeaders[] = $headers;
}

#[QueryHandler('getNotificationHeaders')]
public function getNotificationHeaders(): array
{
return \end($this->notificationHeaders);
}

#[QueryHandler('getAllNotificationHeaders')]
public function getAllNotificationHeaders(): array
{
return $this->notificationHeaders;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace Test\Ecotone\Modelling\Fixture\MetadataPropagatingWithDoubleEventHandlers;

class OrderWasPlaced
{
}
61 changes: 61 additions & 0 deletions packages/Ecotone/tests/Modelling/Unit/MetadataPropagatingTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Modelling\Unit;

use Ecotone\Lite\EcotoneLite;
use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
use PHPUnit\Framework\TestCase;
use Test\Ecotone\Modelling\Fixture\MetadataPropagatingWithDoubleEventHandlers\OrderService;

final class MetadataPropagatingTest extends TestCase
{
public function test_propagating_headers_to_all_published_synchronous_event_handlers(): void
{
$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
classesToResolve: [OrderService::class],
containerOrAvailableServices: [new OrderService()]
);

$ecotoneTestSupport->sendCommandWithRoutingKey(
'placeOrder',
metadata: [
'userId' => '123'
]
);

$notifications = $ecotoneTestSupport->sendQueryWithRouting('getAllNotificationHeaders');
$this->assertCount(2, $notifications);
$this->assertEquals('123', $notifications[0]['userId']);
$this->assertEquals('123', $notifications[1]['userId']);
}

public function test_propagating_headers_to_all_published_asynchronous_event_handlers(): void
{
$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
classesToResolve: [OrderService::class],
containerOrAvailableServices: [new OrderService()],
configuration: ServiceConfiguration::createWithAsynchronicityOnly()
->withExtensionObjects([
SimpleMessageChannelBuilder::createQueueChannel('orders')
])
);

$ecotoneTestSupport->sendCommandWithRoutingKey(
'placeOrder',
metadata: [
'userId' => '123'
]
);

$ecotoneTestSupport->run('orders', ExecutionPollingMetadata::createWithTestingSetup(2));
$notifications = $ecotoneTestSupport->sendQueryWithRouting('getAllNotificationHeaders');

$this->assertCount(2, $notifications);
$this->assertEquals('123', $notifications[0]['userId']);
$this->assertEquals('123', $notifications[1]['userId']);
}
}
2 changes: 2 additions & 0 deletions packages/PdoEventSourcing/src/ChannelProjectionExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Modelling\Event;

final class ChannelProjectionExecutor implements ProjectionExecutor
Expand All @@ -31,6 +32,7 @@ public function executeWith(string $eventName, Event $event, ?array $state = nul
ProjectionEventHandler::PROJECTION_IS_REBUILDING => $this->projectionStatus == ProjectionStatus::REBUILDING(),
ProjectionEventHandler::PROJECTION_NAME => $this->projectionSetupConfiguration->getProjectionName(),
ProjectionEventHandler::PROJECTION_IS_POLLING => true,
MessageHeaders::STREAM_BASED_SOURCED => true
]
),
$projectionEventHandler->getSynchronousRequestChannelName()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection;

use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Modelling\Attribute\EventHandler;
use Ecotone\Modelling\Attribute\QueryHandler;

final class NotificationService
{
private int $fooCounter = 0;

#[Asynchronous(OrderProjection::CHANNEL)]
#[EventHandler(endpointId: 'notification_service.order_created')]
public function when(OrderCreated $event, array $metadata): void
{
if (array_key_exists('foo', $metadata)) {
$this->fooCounter++;
}
}

#[Asynchronous(OrderProjection::CHANNEL)]
#[EventHandler(endpointId: 'notification_service.another_order_created')]
public function another(OrderCreated $event, array $metadata): void
{
if (array_key_exists('foo', $metadata)) {
$this->fooCounter++;
}
}

#[QueryHandler("getNotificationCountWithFoo")]
public function getNotificationCountWithFoo(): int
{
return $this->fooCounter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\EventSourcing\Fixture\MetadataPropagationWithAsyncProjection;

use Ecotone\Modelling\Attribute\AggregateIdentifier;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
use Ecotone\Modelling\Attribute\EventSourcingHandler;
use Ecotone\Modelling\WithAggregateVersioning;

#[EventSourcingAggregate]
final class Order
{
use WithAggregateVersioning;

#[AggregateIdentifier]
private int $id;

#[CommandHandler(routingKey: 'order.create')]
public static function create(int $id): array
{
return [new OrderCreated($id), new ProductAddedToOrder($id)];
}

#[EventSourcingHandler]
public function applyOrderCreated(OrderCreated $event): void
{
$this->id = $event->id;
}
}
Loading

0 comments on commit 91096ef

Please sign in to comment.