Skip to content

Commit

Permalink
Improve logging v3 (#172)
Browse files Browse the repository at this point in the history
* Refactor reply gateway conversion

* refactor gateway

* refactor

* shorter flow

* shorter flow

* logging dlq logs

* logs

* logs

* tests clean up

* lose requirements for execution
  • Loading branch information
dgafka authored Aug 7, 2023
1 parent d0c976c commit e67b626
Show file tree
Hide file tree
Showing 17 changed files with 265 additions and 177 deletions.
15 changes: 14 additions & 1 deletion packages/Dbal/src/DbalTransaction/DbalTransactionInterceptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public function transactional(MethodInvocation $methodInvocation, ?AsynchronousR
}
}

$logger->info('Starting Database Transaction');
foreach ($connections as $connection) {
$connection->beginTransaction();
}
Expand All @@ -64,13 +65,25 @@ public function transactional(MethodInvocation $methodInvocation, ?AsynchronousR
foreach ($connections as $connection) {
try {
$connection->commit();
$logger->info('Committing Database Transaction');
} catch (PDOException $exception) {
/** Handles the case where Mysql did implicit commit, when new creating tables */
if (! str_contains($exception->getMessage(), 'There is no active transaction')) {
$logger->info(
'Rolling back Database Transaction',
[
'exception' => $exception
]
);
throw $exception;
}

$logger->info('Implicit Commit was detected, skipping manual one.');
$logger->info(
'Implicit Commit was detected, skipping manual one.',
[
'exception' => $exception
]
);
/** Doctrine hold the state, so it needs to be cleaned */
try {
$connection->rollBack();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Messaging\Handler\Logger\LoggingHandlerBuilder;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\ReferenceBuilder;
use Ecotone\Messaging\Handler\Recoverability\ErrorHandler;
use Ecotone\Messaging\Handler\Recoverability\ErrorHandlerConfiguration;
use Ecotone\Messaging\Handler\Router\RouterBuilder;
Expand Down Expand Up @@ -49,11 +51,17 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
}

$errorHandler = ServiceActivatorBuilder::createWithDirectReference(
new ErrorHandler($extensionObject->getDelayedRetryTemplate(), (bool)$extensionObject->getDeadLetterQueueChannel()),
new ErrorHandler(
$extensionObject->getDelayedRetryTemplate(),
(bool)$extensionObject->getDeadLetterQueueChannel()
),
'handle'
)
->withEndpointId('error_handler.' . $extensionObject->getErrorChannelName())
->withInputChannelName($extensionObject->getErrorChannelName());
->withInputChannelName($extensionObject->getErrorChannelName())
->withMethodParameterConverters([
ReferenceBuilder::create('logger', LoggingHandlerBuilder::LOGGER_REFERENCE)
]);
if ($extensionObject->getDeadLetterQueueChannel()) {
$errorHandler = $errorHandler->withOutputMessageChannel($extensionObject->getDeadLetterQueueChannel());
$messagingConfiguration
Expand Down
4 changes: 2 additions & 2 deletions packages/Ecotone/src/Messaging/Conversion/MediaType.php
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ public static function parseMediaType(string $mediaType): self
}
$parsedMediaType = explode('/', $mediaType);

Assert::keyExists($parsedMediaType, 0, "Passed media type {$mediaType} has no type");
Assert::keyExists($parsedMediaType, 1, "Passed media type {$mediaType} has no subtype");
Assert::keyExists($parsedMediaType, 0, "Passed media type `{$mediaType}` has no type");
Assert::keyExists($parsedMediaType, 1, "Passed media type `{$mediaType}` has no subtype");
$parametersToParse = explode(';', $parsedMediaType[1]);
$subtype = array_shift($parametersToParse);
$parameters = [];
Expand Down
114 changes: 20 additions & 94 deletions packages/Ecotone/src/Messaging/Handler/Gateway/Gateway.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,6 @@ class Gateway implements NonProxyGateway
*/
private array $messageConverters;
private InterfaceToCall $interfaceToCall;
private ?PollableChannel $replyChannel;
private ?MessageChannel $errorChannel;
private int $replyMilliSecondsTimeout;
private MessageChannel $gatewayRequestChannel;
private ReferenceSearchService $referenceSearchService;
private iterable $aroundInterceptors;
/**
* @var InputOutputMessageHandlerBuilder[]
*/
private iterable $sortedBeforeInterceptors = [];
/**
* @var InputOutputMessageHandlerBuilder[]
*/
private iterable $sortedAfterInterceptors = [];
private iterable $endpointAnnotations;
private ChannelResolver $channelResolver;

/**
* GatewayProxy constructor.
Expand All @@ -80,30 +64,12 @@ public function __construct(
InterfaceToCall $interfaceToCall,
MethodCallToMessageConverter $methodCallToMessageConverter,
array $messageConverters,
MessageChannel $requestChannel,
?PollableChannel $replyChannel,
?MessageChannel $errorChannel,
int $replyMilliSecondsTimeout,
ReferenceSearchService $referenceSearchService,
ChannelResolver $channelResolver,
iterable $aroundInterceptors,
iterable $sortedBeforeInterceptors,
iterable $sortedAfterInterceptors,
iterable $endpointAnnotations
private GatewayReplyConverter $gatewayReplyConverter,
private MessageHandler $gatewayInternalHandler
) {
$this->methodCallToMessageConverter = $methodCallToMessageConverter;
$this->messageConverters = $messageConverters;
$this->interfaceToCall = $interfaceToCall;
$this->replyChannel = $replyChannel;
$this->errorChannel = $errorChannel;
$this->replyMilliSecondsTimeout = $replyMilliSecondsTimeout;
$this->gatewayRequestChannel = $requestChannel;
$this->referenceSearchService = $referenceSearchService;
$this->aroundInterceptors = $aroundInterceptors;
$this->endpointAnnotations = $endpointAnnotations;
$this->sortedBeforeInterceptors = $sortedBeforeInterceptors;
$this->sortedAfterInterceptors = $sortedAfterInterceptors;
$this->channelResolver = $channelResolver;
}

/**
Expand Down Expand Up @@ -163,74 +129,34 @@ public function execute(array $methodArgumentValues)
$requestMessage = $requestMessage
->removeHeader(MessageHeaders::REPLY_CONTENT_TYPE)
->build();
$messageHandler = $this->buildHandler($replyContentType);

$messageHandler->handle($requestMessage);
$reply = $internalReplyBridge ? $internalReplyBridge->receive() : null;
$this->gatewayInternalHandler->handle($requestMessage);
$replyMessage = $internalReplyBridge ? $internalReplyBridge->receive() : null;
if (!is_null($replyMessage) && $this->interfaceToCall->canReturnValue()) {
if ($replyContentType !== null || ! ($this->interfaceToCall->getReturnType()->isAnything() || $this->interfaceToCall->getReturnType()->isMessage())) {
$reply = $this->gatewayReplyConverter->convert($replyMessage, $replyContentType);
if (!($reply instanceof Message)) {
$replyMessage = MessageBuilder::fromMessage($replyMessage)
->setPayload($reply)
->build();
}else {
$replyMessage = $reply;
}
}
}

if ($reply) {
if ($replyMessage) {
if ($this->interfaceToCall->getReturnType()->isClassOfType(Message::class)) {
if ($previousReplyChannel) {
return MessageBuilder::fromMessage($reply)
return MessageBuilder::fromMessage($replyMessage)
->setReplyChannel($previousReplyChannel)
->build();
}

return $reply;
return $replyMessage;
}

return $reply->getPayload();
return $replyMessage->getPayload();
}
}


private function buildHandler(?MediaType $replyContentType): MessageHandler
{
$gatewayInternalHandler = new GatewayInternalHandler(
$this->interfaceToCall,
$this->gatewayRequestChannel,
$this->errorChannel,
$this->replyChannel,
$this->replyMilliSecondsTimeout
);

$gatewayInternalHandler = ServiceActivatorBuilder::createWithDirectReference($gatewayInternalHandler, 'handle')
->withWrappingResultInMessage(false)
->withEndpointAnnotations($this->endpointAnnotations);
$aroundInterceptorReferences = $this->aroundInterceptors;
if ($replyContentType !== null || ! ($this->interfaceToCall->getReturnType()->isAnything() || $this->interfaceToCall->getReturnType()->isMessage())) {
/** @TODO That probably should be added quicker */
$aroundInterceptorReferences[] = AroundInterceptorReference::createWithDirectObjectAndResolveConverters(
$this->referenceSearchService->get(InterfaceToCallRegistry::REFERENCE_NAME),
new ConversionInterceptor(
$this->referenceSearchService->get(ConversionService::REFERENCE_NAME),
$this->interfaceToCall,
$replyContentType,
$this->messageConverters,
),
'convert',
Precedence::GATEWAY_REPLY_CONVERSION_PRECEDENCE,
$this->interfaceToCall->getInterfaceName()
);
}
foreach ($aroundInterceptorReferences as $aroundInterceptorReference) {
$gatewayInternalHandler = $gatewayInternalHandler->addAroundInterceptor($aroundInterceptorReference);
}


$chainHandler = ChainMessageHandlerBuilder::create();
foreach ($this->sortedBeforeInterceptors as $beforeInterceptor) {
$chainHandler = $chainHandler->chain($beforeInterceptor);
}
$chainHandler = $chainHandler->chain($gatewayInternalHandler);
foreach ($this->sortedAfterInterceptors as $afterInterceptor) {
$chainHandler = $chainHandler->chain($afterInterceptor);
}

return $chainHandler
->build(
$this->channelResolver,
$this->referenceSearchService
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
namespace Ecotone\Messaging\Handler\Gateway;

use Ecotone\Messaging\Channel\DirectChannel;
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Handler\Chain\ChainMessageHandlerBuilder;
use Ecotone\Messaging\Handler\ChannelResolver;
use Ecotone\Messaging\Handler\Gateway\ParameterToMessageConverter\GatewayHeadersBuilder;
use Ecotone\Messaging\Handler\Gateway\ParameterToMessageConverter\GatewayHeaderValueBuilder;
Expand All @@ -19,8 +21,11 @@
use Ecotone\Messaging\Handler\Processor\MethodInvoker\AroundInterceptorReference;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInterceptor;
use Ecotone\Messaging\Handler\ReferenceSearchService;
use Ecotone\Messaging\Handler\ServiceActivator\ServiceActivatorBuilder;
use Ecotone\Messaging\Handler\TypeDefinitionException;
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\MessageChannel;
use Ecotone\Messaging\MessageHandler;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Messaging\MessagingException;
use Ecotone\Messaging\PollableChannel;
Expand Down Expand Up @@ -263,7 +268,7 @@ public function resolveRelatedInterfaces(InterfaceToCallRegistry $interfaceToCal
$resolvedInterfaces = [
$interfaceToCallRegistry->getFor(GatewayInternalHandler::class, 'handle'),
$interfaceToCallRegistry->getFor(ErrorChannelInterceptor::class, 'handle'),
$interfaceToCallRegistry->getFor(ConversionInterceptor::class, 'convert'),
$interfaceToCallRegistry->getFor(GatewayReplyConverter::class, 'convert'),
$interfaceToCallRegistry->getFor($this->interfaceName, $this->methodName),
];

Expand Down Expand Up @@ -388,6 +393,28 @@ public function buildWithoutProxyObject(ReferenceSearchService $referenceSearchS
$messageConverters[] = $referenceSearchService->get($messageConverterReferenceName);
}

return new Gateway(
$interfaceToCall,
new MethodCallToMessageConverter(
$interfaceToCall,
$methodArgumentConverters
),
$messageConverters,
new GatewayReplyConverter(
$referenceSearchService->get(ConversionService::REFERENCE_NAME),
$interfaceToCall,
$messageConverters,
),
$this->buildGatewayInternalHandler($interfaceToCall, $referenceSearchService, $channelResolver)
);
}

private function buildGatewayInternalHandler(
InterfaceToCall $interfaceToCall,
ReferenceSearchService $referenceSearchService,
ChannelResolver $channelResolver
): MessageHandler
{
$registeredAnnotations = $this->endpointAnnotations;
foreach ($interfaceToCall->getMethodAnnotations() as $annotation) {
if ($this->canBeAddedToRegisteredAnnotations($registeredAnnotations, $annotation)) {
Expand All @@ -400,25 +427,36 @@ public function buildWithoutProxyObject(ReferenceSearchService $referenceSearchS
}
}

$beforeInterceptors = $this->beforeInterceptors;
return new Gateway(
$gatewayInternalHandler = new GatewayInternalHandler(
$interfaceToCall,
new MethodCallToMessageConverter(
$interfaceToCall,
$methodArgumentConverters
),
$messageConverters,
$requestChannel,
$replyChannel,
$errorChannel,
$this->replyMilliSecondsTimeout,
$referenceSearchService,
$channelResolver,
$this->getSortedAroundInterceptors($this->aroundInterceptors),
$this->getSortedInterceptors($beforeInterceptors),
$this->getSortedInterceptors($this->afterInterceptors),
$registeredAnnotations
$channelResolver->resolve($this->requestChannelName),
$this->errorChannelName ? $channelResolver->resolve($this->errorChannelName) : null,
$this->replyChannelName ? $channelResolver->resolve($this->replyChannelName) : null,
$this->replyMilliSecondsTimeout
);

$chainHandler = ChainMessageHandlerBuilder::create();
foreach ($this->getSortedInterceptors($this->beforeInterceptors) as $beforeInterceptor) {
$chainHandler = $chainHandler->chain($beforeInterceptor);
}
$chainHandler = $chainHandler->chainInterceptedHandler(
ServiceActivatorBuilder::createWithDirectReference($gatewayInternalHandler, 'handle')
->withWrappingResultInMessage(false)
->withEndpointAnnotations($registeredAnnotations)
);
foreach ($this->getSortedInterceptors($this->afterInterceptors) as $afterInterceptor) {
$chainHandler = $chainHandler->chain($afterInterceptor);
}

foreach ($this->getSortedAroundInterceptors($this->aroundInterceptors) as $aroundInterceptorReference) {
$chainHandler = $chainHandler->addAroundInterceptor($aroundInterceptorReference);
}

return $chainHandler
->build(
$channelResolver,
$referenceSearchService
);
}

/**
Expand Down
Loading

0 comments on commit e67b626

Please sign in to comment.