diff --git a/base/service/src/main/java/org/eclipse/ditto/base/service/devops/DevOpsCommandsActor.java b/base/service/src/main/java/org/eclipse/ditto/base/service/devops/DevOpsCommandsActor.java index 18d72c71cac..6490d14fb06 100644 --- a/base/service/src/main/java/org/eclipse/ditto/base/service/devops/DevOpsCommandsActor.java +++ b/base/service/src/main/java/org/eclipse/ditto/base/service/devops/DevOpsCommandsActor.java @@ -522,8 +522,7 @@ private DevOpsCommandResponseCorrelationActor(final ActorRef devOpsCommandSender final var dittoHeaders = devOpsCommand.getDittoHeaders(); aggregateResults = isAggregateResults(dittoHeaders); this.expectedResponses = expectedResponses; - logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this); - logger.setCorrelationId(dittoHeaders); + logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this).withCorrelationId(dittoHeaders); } private static boolean isAggregateResults(final DittoHeaders dittoHeaders) { diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BasePublisherActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BasePublisherActor.java index 9583bd4e155..07c3bba9fb0 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BasePublisherActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BasePublisherActor.java @@ -13,7 +13,6 @@ package org.eclipse.ditto.connectivity.service.messaging; import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; -import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -314,11 +313,10 @@ private Stream sendMappedOutboundSignal(final OutboundSignal.M final int maxPayloadBytesForSignal) { final var message = outbound.getExternalMessage(); - final String correlationId = message.getHeaders().get(CORRELATION_ID.getKey()); final Signal outboundSource = outbound.getSource(); final List outboundTargets = outbound.getTargets(); - final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(correlationId); + final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(message.getHeaders()); final Optional replyTargetSendingContext = getSendingContext(outbound); final List sendingContexts = replyTargetSendingContext diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundDispatchingSink.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundDispatchingSink.java index 53d2ae0f277..9296aff1ced 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundDispatchingSink.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundDispatchingSink.java @@ -329,11 +329,8 @@ public Optional> onMapped(final String mapperId, @Override public Optional> onDropped(final String mapperId, @Nullable final ExternalMessage incomingMessage) { - logger.withCorrelationId(Optional.ofNullable(incomingMessage) - .map(ExternalMessage::getHeaders) - .map(h -> h.get(DittoHeaderDefinition.CORRELATION_ID.getKey())) - .orElse(null) - ).debug("Message mapping returned null, message is dropped."); + logger.withCorrelationId(incomingMessage != null ? incomingMessage.getHeaders() : null) + .debug("Message mapping returned null, message is dropped."); if (incomingMessage != null) { final String source = getAddress(incomingMessage); final ConnectionMonitor.InfoProvider infoProvider = InfoProviderFactory.forExternalMessage(incomingMessage); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessor.java index 17e1a62c9f0..c2e6cbbee73 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessor.java @@ -130,7 +130,7 @@ static InboundMappingProcessor of(final Connection connection, @Override List> process(final ExternalMessage message) { final var mappers = getMappers(message.getPayloadMapping().orElse(null)); - logger.withCorrelationId(message.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey())) + logger.withCorrelationId(message.getHeaders()) .debug("Mappers resolved for message: {}", mappers); final var mappingTimer = MappingTimer.inbound(connectionId, connectionType, message.getHeaders()); return mappingTimer.overall(() -> mappers.stream() diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingSink.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingSink.java index bcc3d704aa0..7e5160780f1 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingSink.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingSink.java @@ -21,7 +21,6 @@ import javax.annotation.Nullable; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; -import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.service.config.ThrottlingConfig; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; @@ -169,14 +168,12 @@ private InboundMappingOutcomes mapInboundMessage(final ExternalMessageWithSender final InboundMappingProcessor inboundMappingProcessor) { final var externalMessage = withSender.externalMessage(); - @Nullable final var correlationId = - externalMessage.findHeaderIgnoreCase(DittoHeaderDefinition.CORRELATION_ID.getKey()).orElse(null); - logger.withCorrelationId(correlationId) + logger.withCorrelationId(externalMessage.getHeaders()) .debug("Handling ExternalMessage: {}", externalMessage); try { return mapExternalMessageToSignal(withSender, inboundMappingProcessor); } catch (final Exception e) { - logger.withCorrelationId(correlationId) + logger.withCorrelationId(externalMessage.getHeaders()) .error("Handling exception when mapping external message: {}", e.getMessage()); return InboundMappingOutcomes.of(withSender.externalMessage(), e, withSender.sender()); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/Sending.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/Sending.java index 5dc3a4ec679..ac03460ce3c 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/Sending.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/Sending.java @@ -13,8 +13,9 @@ package org.eclipse.ditto.connectivity.service.messaging; import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; -import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; @@ -29,7 +30,6 @@ import org.eclipse.ditto.base.model.entity.id.EntityId; import org.eclipse.ditto.base.model.entity.id.WithEntityId; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; -import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.signals.Signal; import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; import org.eclipse.ditto.base.model.signals.commands.CommandResponse; @@ -217,10 +217,9 @@ private void updateSendMonitor(@Nullable final Exception exception) { dittoRuntimeException.getClass().getSimpleName(), dittoRuntimeException.getMessage()); } else { publishedMonitor.exception(message, exception); - final DittoHeaders internalHeaders = message.getInternalHeaders(); - @Nullable final String correlationId = internalHeaders.getCorrelationId() - .orElseGet(() -> message.findHeaderIgnoreCase(CORRELATION_ID.getKey()).orElse(null)); - logger.withCorrelationId(correlationId) + final Map combinedHeaders = new HashMap<>(message.getHeaders()); + combinedHeaders.putAll(message.getInternalHeaders()); + logger.withCorrelationId(combinedHeaders) .info("Unexpected failure when publishing signal - {}: {}", exception.getClass().getSimpleName(), exception.getMessage()); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java index 1cbe5ac3178..fcbb336c084 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java @@ -365,7 +365,7 @@ private void handleJmsMessage(final JmsMessage message) { .start(); headers = startedSpan.propagateContext(headers); final ExternalMessageBuilder builder = ExternalMessageFactory.newExternalMessageBuilder(headers); - final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder, correlationId) + final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder) .withAuthorizationContext(source.getAuthorizationContext()) .withEnforcement(headerEnforcementFilterFactory.getFilter(headers)) .withHeaderMapping(source.getHeaderMapping()) @@ -374,10 +374,10 @@ private void handleJmsMessage(final JmsMessage message) { .build(); inboundMonitor.success(externalMessage); final Map externalMessageHeaders = externalMessage.getHeaders(); - logger.withCorrelationId(correlationId) + logger.withCorrelationId(headers) .info("Received message from AMQP 1.0 with externalMessageHeaders: {}", externalMessageHeaders); if (logger.isDebugEnabled()) { - logger.withCorrelationId(correlationId).debug("Received message from AMQP 1.0 with payload: {}", + logger.withCorrelationId(headers).debug("Received message from AMQP 1.0 with payload: {}", externalMessage.getTextPayload().orElse("binary")); } forwardToMapping(externalMessage, @@ -404,7 +404,7 @@ private void handleJmsMessage(final JmsMessage message) { inboundMonitor.exception(e); } startedSpan.tagAsFailed(e); - logger.withCorrelationId(correlationId) + logger.withCorrelationId(headers) .error(e, "Unexpected {}: {}", e.getClass().getName(), e.getMessage()); } finally { startedSpan.finish(); @@ -422,8 +422,6 @@ private void handleJmsMessage(final JmsMessage message) { private void acknowledge(final JmsMessage message, final boolean isSuccess, final boolean redeliver, final Map externalMessageHeaders) { - final Optional correlationId = Optional.ofNullable( - externalMessageHeaders.get(DittoHeaderDefinition.CORRELATION_ID.getKey())); try { final String messageId = message.getJMSMessageID(); recordAckForRateLimit(messageId, isSuccess, redeliver); @@ -437,8 +435,7 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina ackType = redeliver ? MODIFIED_FAILED : REJECTED; ackTypeName = redeliver ? "modified[delivery-failed]" : "rejected"; } - final String jmsCorrelationID = message.getJMSCorrelationID(); - logger.withCorrelationId(correlationId.orElse(jmsCorrelationID)) + logger.withCorrelationId(externalMessageHeaders) .info(MessageFormat.format( "Acking <{0}> with original external message headers=<{1}>, isSuccess=<{2}>, ackType=<{3} {4}>", messageId, @@ -457,15 +454,15 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina "Sending negative acknowledgement: <{0}>", ackTypeName); } } catch (final IllegalStateException e) { - logger.withCorrelationId(correlationId.orElse(null)) + logger.withCorrelationId(externalMessageHeaders) .warning(e, "Failed to ack an AMQP message because of server side issues"); } catch (final Exception e) { - logger.withCorrelationId(correlationId.orElse(null)).error(e, "Failed to ack an AMQP message"); + logger.withCorrelationId(externalMessageHeaders).error(e, "Failed to ack an AMQP message"); } } private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage message, - final ExternalMessageBuilder builder, @Nullable final String correlationId) throws JMSException { + final ExternalMessageBuilder builder) throws JMSException { if (message instanceof TextMessage textMessage) { final String payload = textMessage.getText(); if (payload == null) { @@ -487,7 +484,7 @@ private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage messag if (logger.isDebugEnabled()) { final Destination destination = message.getJMSDestination(); final Map headersMapFromJmsMessage = extractHeadersMapFromJmsMessage(message); - logger.withCorrelationId(correlationId) + logger.withCorrelationId(headersMapFromJmsMessage) .debug("Received message at '{}' of unsupported type ({}) with headers: {}", destination, message.getClass().getName(), headersMapFromJmsMessage); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java index f27e5c76653..017beacdf41 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java @@ -117,7 +117,7 @@ public TransformationResult transform(final ConsumerRecord c try { final String key = consumerRecord.key(); final ByteBuffer value = consumerRecord.value(); - final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(correlationId); + final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(messageHeaders); correlationIdScopedLogger.debug( "Transforming incoming kafka message <{}> with headers <{}> and key <{}>.", value, messageHeaders, key @@ -148,7 +148,7 @@ public TransformationResult transform(final ConsumerRecord c return TransformationResult.failed(e.setDittoHeaders(DittoHeaders.of(messageHeaders))); } catch (final Exception e) { inboundMonitor.exception(messageHeaders, e); - LOGGER.withCorrelationId(correlationId) + LOGGER.withCorrelationId(messageHeaders) .error(String.format("Unexpected {%s}: {%s}", e.getClass().getName(), e.getMessage()), e); startedSpan.tagAsFailed(e); return null; // Drop message diff --git a/connectivity/service/src/main/resources/logback.xml b/connectivity/service/src/main/resources/logback.xml index 0c53bfc38a5..7253b517a13 100644 --- a/connectivity/service/src/main/resources/logback.xml +++ b/connectivity/service/src/main/resources/logback.xml @@ -16,14 +16,14 @@ - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n System.err - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n ERROR diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractConsumerActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractConsumerActorTest.java index 064f3bbbe53..5f2e1e9bd9c 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractConsumerActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractConsumerActorTest.java @@ -254,6 +254,7 @@ protected Sink setupInboundMappingSink(final ActorRef clientAct .thenReturn(logger); when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class))) .thenReturn(logger); + when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger); final ProtocolAdapter protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null); final var connection = CONNECTION.toBuilder().payloadMappingDefinition(payloadMappingDefinition).build(); final InboundMappingProcessor inboundMappingProcessor = diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractMessageMappingProcessorActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractMessageMappingProcessorActorTest.java index a008bb74e0c..89db681a6ec 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractMessageMappingProcessorActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractMessageMappingProcessorActorTest.java @@ -349,6 +349,7 @@ ActorRef createInboundMappingProcessorActor(final ActorRef proxyActor, final var logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class); Mockito.when(logger.withCorrelationId(Mockito.any(DittoHeaders.class))) .thenReturn(logger); + Mockito.when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger); Mockito.when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))) .thenReturn(logger); Mockito.when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class))) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorTest.java index 5042dea125a..85f8b733767 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorTest.java @@ -112,6 +112,7 @@ public static void setUp() { when(logger.withCorrelationId(Mockito.nullable(String.class))).thenReturn(logger); when(logger.withCorrelationId(Mockito.nullable(WithDittoHeaders.class))).thenReturn(logger); when(logger.withCorrelationId(Mockito.nullable(DittoHeaders.class))).thenReturn(logger); + when(logger.withCorrelationId(Mockito.nullable(Map.class))).thenReturn(logger); when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger); protocolAdapterProvider = new DittoProtocolAdapterProvider(Mockito.mock(ProtocolConfig.class)); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/SendingTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/SendingTest.java index b8b7e7f6e7c..d14faf075db 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/SendingTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/SendingTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.eq; import static org.mockito.Mockito.withSettings; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -94,6 +95,7 @@ public void setUp() { Mockito.when(externalMessage.getInternalHeaders()).thenReturn(dittoHeaders); Mockito.when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger); Mockito.when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class))).thenReturn(logger); + Mockito.when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger); exceptionConverter = DefaultExceptionToAcknowledgementConverter.getInstance(); } diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/TestConstants.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/TestConstants.java index da6dcfb5ea6..a6b6d1ce919 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/TestConstants.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/TestConstants.java @@ -245,6 +245,7 @@ public static ThreadSafeDittoLoggingAdapter mockThreadSafeDittoLoggingAdapter() when(logger.withCorrelationId(Mockito.nullable(String.class))).thenReturn(logger); when(logger.withCorrelationId(Mockito.nullable(WithDittoHeaders.class))).thenReturn(logger); when(logger.withCorrelationId(Mockito.nullable(DittoHeaders.class))).thenReturn(logger); + when(logger.withCorrelationId(Mockito.nullable(Map.class))).thenReturn(logger); when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger); return logger; } diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActorTest.java index 22331b78b7e..c34266b8ba0 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActorTest.java @@ -360,6 +360,7 @@ private Sink setupMappingSink(final ActorRef testRef, final ThreadSafeDittoLoggingAdapter logger = mock(ThreadSafeDittoLoggingAdapter.class); when(logger.withCorrelationId(any(DittoHeaders.class))) .thenReturn(logger); + when(logger.withCorrelationId(any(Map.class))).thenReturn(logger); when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))) .thenReturn(logger); when(logger.withCorrelationId(any(WithDittoHeaders.class))) diff --git a/deployment/helm/ditto/Chart.yaml b/deployment/helm/ditto/Chart.yaml index c9f59634505..b221e8da6db 100644 --- a/deployment/helm/ditto/Chart.yaml +++ b/deployment/helm/ditto/Chart.yaml @@ -16,7 +16,7 @@ description: | A digital twin is a virtual, cloud based, representation of his real world counterpart (real world “Things”, e.g. devices like sensors, smart heating, connected cars, smart grids, EV charging stations etc). type: application -version: 3.3.9 # chart version is effectively set by release-job +version: 3.3.10 # chart version is effectively set by release-job appVersion: 3.3.6 keywords: - iot-chart diff --git a/deployment/helm/ditto/logback-config/connectivity.xml b/deployment/helm/ditto/logback-config/connectivity.xml index 0d22766ad5c..a67dde6b9e8 100644 --- a/deployment/helm/ditto/logback-config/connectivity.xml +++ b/deployment/helm/ditto/logback-config/connectivity.xml @@ -17,7 +17,6 @@ sourceActorSystem akkaUid akkaTimestamp - x-correlation-id=correlation-id connection-id=ditto-connection-id connection-type=ditto-connection-type diff --git a/deployment/helm/ditto/logback-config/gateway.xml b/deployment/helm/ditto/logback-config/gateway.xml index 348e3a63875..563d732007c 100755 --- a/deployment/helm/ditto/logback-config/gateway.xml +++ b/deployment/helm/ditto/logback-config/gateway.xml @@ -17,7 +17,6 @@ sourceActorSystem akkaUid akkaTimestamp - x-correlation-id=correlation-id diff --git a/deployment/helm/ditto/logback-config/policies.xml b/deployment/helm/ditto/logback-config/policies.xml index 348e3a63875..563d732007c 100755 --- a/deployment/helm/ditto/logback-config/policies.xml +++ b/deployment/helm/ditto/logback-config/policies.xml @@ -17,7 +17,6 @@ sourceActorSystem akkaUid akkaTimestamp - x-correlation-id=correlation-id diff --git a/deployment/helm/ditto/logback-config/things.xml b/deployment/helm/ditto/logback-config/things.xml index 348e3a63875..563d732007c 100755 --- a/deployment/helm/ditto/logback-config/things.xml +++ b/deployment/helm/ditto/logback-config/things.xml @@ -17,7 +17,6 @@ sourceActorSystem akkaUid akkaTimestamp - x-correlation-id=correlation-id diff --git a/deployment/helm/ditto/logback-config/thingssearch.xml b/deployment/helm/ditto/logback-config/thingssearch.xml index 348e3a63875..563d732007c 100755 --- a/deployment/helm/ditto/logback-config/thingssearch.xml +++ b/deployment/helm/ditto/logback-config/thingssearch.xml @@ -17,7 +17,6 @@ sourceActorSystem akkaUid akkaTimestamp - x-correlation-id=correlation-id diff --git a/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementAggregatorActor.java b/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementAggregatorActor.java index f957f0a440b..2d4a4dcd632 100644 --- a/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementAggregatorActor.java +++ b/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementAggregatorActor.java @@ -104,7 +104,7 @@ private AcknowledgementAggregatorActor(final EntityId entityId, final var acknowledgementRequests = signalDittoHeaders.getAcknowledgementRequests(); ackregator = AcknowledgementAggregator.getInstance(entityId, correlationId, timeout, headerTranslator); ackregator.addAcknowledgementRequests(acknowledgementRequests); - log.withCorrelationId(correlationId) + log.withCorrelationId(signalDittoHeaders) .info("Starting to wait for all requested acknowledgements <{}> for a maximum duration of <{}>.", acknowledgementRequests, timeout); diff --git a/edge/service/src/test/resources/logback-test.xml b/edge/service/src/test/resources/logback-test.xml index 8fb79489124..b7360bc0f17 100644 --- a/edge/service/src/test/resources/logback-test.xml +++ b/edge/service/src/test/resources/logback-test.xml @@ -19,7 +19,7 @@ - %d{HH:mm:ss.SSS} [%-5level] [%X{x-correlation-id}] %logger{15} - %msg%n%rEx + %d{HH:mm:ss.SSS} [%-5level] [%X{correlation-id}] %logger{15} - %msg%n%rEx diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/actors/AbstractHttpRequestActor.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/actors/AbstractHttpRequestActor.java index 5cc7f7942d2..a16a01e07a0 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/actors/AbstractHttpRequestActor.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/actors/AbstractHttpRequestActor.java @@ -267,7 +267,6 @@ public void serviceRequestsDone(final Control serviceRequestsDone) { private void handleCommand(final Command command) { try { - logger.setCorrelationId(command); receivedCommand = command; setDefaultTimeoutExceptionSupplier(command); final var timeoutOverride = getReceiveTimeout(command, commandConfig); @@ -528,8 +527,8 @@ private void handleReceiveTimeout() { final var actorContext = getContext(); final var receiveTimeout = actorContext.getReceiveTimeout(); - logger.setCorrelationId(WithDittoHeaders.getCorrelationId(receivedCommand).orElse(null)); - logger.info("Got <{}> after <{}> before an appropriate response arrived.", + logger.withCorrelationId(receivedCommand) + .info("Got <{}> after <{}> before an appropriate response arrived.", ReceiveTimeout.class.getSimpleName(), receiveTimeout); if (null != timeoutExceptionSupplier) { @@ -539,8 +538,9 @@ private void handleReceiveTimeout() { } else { actorContext.cancelReceiveTimeout(); // This case is a programming error that should not happen at all. - logger.error("Actor does not have a timeout exception supplier." + - " Thus, no DittoRuntimeException could be handled."); + logger.withCorrelationId(receivedCommand) + .error("Actor does not have a timeout exception supplier. " + + "Thus, no DittoRuntimeException could be handled."); stop(); } } diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/CorrelationIdEnsuringDirective.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/CorrelationIdEnsuringDirective.java index 205d08d1c59..6cb8873685c 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/CorrelationIdEnsuringDirective.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/CorrelationIdEnsuringDirective.java @@ -14,9 +14,12 @@ import static akka.http.javadsl.server.Directives.extractRequestContext; +import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.concurrent.Immutable; @@ -53,7 +56,8 @@ private CorrelationIdEnsuringDirective() { public static Route ensureCorrelationId(final Function inner) { return extractRequestContext(requestContext -> { final String correlationId = getCorrelationIdFromHeaders(requestContext.getRequest()) - .orElseGet(CorrelationIdEnsuringDirective::createNewCorrelationId); + .orElseGet(() -> + CorrelationIdEnsuringDirective.createNewCorrelationId(requestContext.getRequest())); return inner.apply(correlationId); }); } @@ -64,19 +68,25 @@ private static Optional getCorrelationIdFromHeaders(final HttpRequest re .map(HttpHeader::value); if (LOGGER.isDebugEnabled()) { - result.ifPresent(correlationId -> LOGGER.withCorrelationId(correlationId) + result.ifPresent(correlationId -> LOGGER.withCorrelationId(headersAsMap(request)) .debug("Correlation ID <{}> already exists in request.", correlationId)); } return result; } - private static String createNewCorrelationId() { + private static String createNewCorrelationId(final HttpRequest request) { final String result = String.valueOf(UUID.randomUUID()); if (LOGGER.isDebugEnabled()) { - LOGGER.withCorrelationId(result).debug("Created new correlation ID <{}>.", result); + LOGGER.withCorrelationId(headersAsMap(request)) + .debug("Created new correlation ID <{}>.", result); } return result; } + private static Map headersAsMap(final HttpRequest request) { + return StreamSupport.stream(request.getHeaders().spliterator(), false) + .collect(Collectors.toMap(HttpHeader::name, HttpHeader::value)); + } + } diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestResultLoggingDirective.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestResultLoggingDirective.java index d6fdcd3811c..2151acab706 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestResultLoggingDirective.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestResultLoggingDirective.java @@ -20,12 +20,18 @@ import static org.eclipse.ditto.gateway.service.endpoints.directives.RequestLoggingFilter.filterRawUri; import static org.eclipse.ditto.gateway.service.endpoints.directives.RequestLoggingFilter.filterUri; +import java.util.Map; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.gateway.service.endpoints.utils.HttpUtils; import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger; +import akka.http.javadsl.model.HttpHeader; +import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.server.Complete; import akka.http.javadsl.server.Route; @@ -62,7 +68,9 @@ public static Route logRequestResult(final CharSequence correlationId, final Sup final String requestMethod = request.method().name(); final String filteredRelativeRequestUri = filterUri(request.getUri().toRelative()).toString(); return mapRouteResult(routeResult -> { - final ThreadSafeDittoLogger logger = LOGGER.withCorrelationId(correlationId); + final Map headers = headersAsMap(request); + headers.put(DittoHeaderDefinition.CORRELATION_ID.getKey(), correlationId.toString()); + final ThreadSafeDittoLogger logger = LOGGER.withCorrelationId(headers); if (routeResult instanceof Complete complete) { final int statusCode = complete.getResponse().status().intValue(); logger.info("StatusCode of request {} '{}' was: {}", requestMethod, filteredRelativeRequestUri, @@ -73,7 +81,7 @@ public static Route logRequestResult(final CharSequence correlationId, final Sup } request.getHeader(DITTO_TRACE_HEADERS) .filter(unused -> TRACE_LOGGER.isDebugEnabled()) - .ifPresent(unused -> TRACE_LOGGER.withCorrelationId(correlationId) + .ifPresent(unused -> TRACE_LOGGER.withCorrelationId(headers) .debug("Request headers: {}", filterHeaders(request.getHeaders()))); } else { /* routeResult could be Rejected, if no route is able to handle the request -> but this should @@ -88,4 +96,8 @@ public static Route logRequestResult(final CharSequence correlationId, final Sup }); } + private static Map headersAsMap(final HttpRequest request) { + return StreamSupport.stream(request.getHeaders().spliterator(), false) + .collect(Collectors.toMap(HttpHeader::name, HttpHeader::value)); + } } diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java index c5a3f400ede..9740ba600ed 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java @@ -149,7 +149,10 @@ private static Route getRouteWithEnabledTracing( @Nullable final CharSequence correlationId ) { return mapRequest( - req -> adjustSpanContextHeadersOfRequest(req, startedSpan.propagateContext(Map.of())), + req -> adjustSpanContextHeadersOfRequest(req, startedSpan.propagateContext( + StreamSupport.stream(httpRequest.getHeaders().spliterator(), false) + .collect(Collectors.toMap(HttpHeader::name, HttpHeader::value)) + )), () -> mapRouteResult( routeResult -> tryToHandleRouteResult(routeResult, httpRequest, startedSpan, correlationId), innerRouteSupplier diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/preauth/PreAuthenticatedAuthenticationProvider.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/preauth/PreAuthenticatedAuthenticationProvider.java index 7c34fd32200..75e78313611 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/preauth/PreAuthenticatedAuthenticationProvider.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/preauth/PreAuthenticatedAuthenticationProvider.java @@ -14,10 +14,13 @@ import java.text.MessageFormat; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.concurrent.Immutable; @@ -120,7 +123,11 @@ protected CompletableFuture tryToAuthenticate(final Reques AuthorizationModelFactory.newAuthContext(DittoAuthorizationContextType.PRE_AUTHENTICATED_HTTP, authorizationSubjects); - LOGGER.withCorrelationId(dittoHeaders) + final var combinedHeaders = new HashMap<>(dittoHeaders); + combinedHeaders.putAll(StreamSupport.stream(requestContext.getRequest().getHeaders().spliterator(), false) + .collect(Collectors.toMap( + akka.http.javadsl.model.HttpHeader::name, akka.http.javadsl.model.HttpHeader::value))); + LOGGER.withCorrelationId(combinedHeaders) .info("Pre-authentication has been applied resulting in AuthorizationContext <{}>.", authContext); return CompletableFuture.completedFuture(DefaultAuthenticationResult.successful(dittoHeaders, authContext)); diff --git a/gateway/service/src/main/resources/logback.xml b/gateway/service/src/main/resources/logback.xml index 97cdcf2df4a..742c1da5097 100755 --- a/gateway/service/src/main/resources/logback.xml +++ b/gateway/service/src/main/resources/logback.xml @@ -16,14 +16,14 @@ - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n System.err - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n ERROR diff --git a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/CommonMdcEntryKey.java b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/CommonMdcEntryKey.java index 1773c113597..cf58f2f1ae4 100644 --- a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/CommonMdcEntryKey.java +++ b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/CommonMdcEntryKey.java @@ -12,6 +12,17 @@ */ package org.eclipse.ditto.internal.utils.akka.logging; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; + +import javax.annotation.Nullable; + +import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; + /** * An enumeration of commonly known MDC entry keys. * @@ -19,8 +30,10 @@ */ public enum CommonMdcEntryKey implements CharSequence { - CORRELATION_ID("x-correlation-id"), - DITTO_LOG_TAG("ditto-log-tag"); + CORRELATION_ID(DittoHeaderDefinition.CORRELATION_ID.getKey()), + DITTO_LOG_TAG("ditto-log-tag"), + TRACE_PARENT(DittoHeaderDefinition.W3C_TRACEPARENT.getKey()), + TRACE_STATE(DittoHeaderDefinition.W3C_TRACESTATE.getKey()); private final String key; @@ -48,4 +61,48 @@ public String toString() { return key; } + /** + * Returns the {@code CommonMdcEntryKey} with the given name. + * + * @param name the name of the common MDC entry to get. + * @return the common MDC entry with the given name or an empty optional. + */ + public static Optional forName(@Nullable final CharSequence name) { + return Stream.of(values()) + .filter(l -> Objects.equals(l.key, String.valueOf(name))) + .findAny(); + } + + /** + * Extracts a list of MdcEntries based on the given {@code headers} map, picking out explicitly headers which should + * be added to the MDC. + *

+ * This method is called a lot of times in Ditto's codebase and therefore better be fast. This was achieved by not + * iterating over all headers, but to accessing the passed {@code HashMap} for picking out only the supported + * headers. + *

+ * + * @param headers the headers to look into for extracting the MDC worthy headers to log. + * @return a list of MDC entries to add to the MDC. + */ + public static List extractMdcEntriesFromHeaders(@Nullable final Map headers) { + + if (null == headers || headers.isEmpty()) { + return Collections.emptyList(); + } + return Stream.of(CORRELATION_ID, TRACE_PARENT, TRACE_STATE) + .flatMap(mdcEntryKey -> + extractValue(headers, mdcEntryKey) + .map(value -> MdcEntry.of(mdcEntryKey.key, value)) + .stream() + ).toList(); + } + + private static Optional extractValue(final Map headers, + final CommonMdcEntryKey mdcEntryKey) { + + // accessing a key in a HashMap is super fast + return Optional.ofNullable(headers.get(mdcEntryKey.key)); + } + } diff --git a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoDiagnosticLoggingAdapter.java b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoDiagnosticLoggingAdapter.java index 7e9f707e154..dda62da187e 100644 --- a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoDiagnosticLoggingAdapter.java +++ b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoDiagnosticLoggingAdapter.java @@ -24,7 +24,6 @@ import org.eclipse.ditto.base.model.headers.WithDittoHeaders; import akka.event.DiagnosticLoggingAdapter; -import scala.collection.JavaConverters; import scala.collection.immutable.Seq; /** @@ -73,29 +72,18 @@ public DefaultDittoDiagnosticLoggingAdapter withCorrelationId(@Nullable final Ch } @Override - public DefaultDittoDiagnosticLoggingAdapter withCorrelationId(@Nullable final WithDittoHeaders withDittoHeaders) { - return withCorrelationId(null != withDittoHeaders ? withDittoHeaders.getDittoHeaders() : null); - } - - @Override - public DefaultDittoDiagnosticLoggingAdapter withCorrelationId(@Nullable final DittoHeaders dittoHeaders) { - return withCorrelationId(null != dittoHeaders ? dittoHeaders.getCorrelationId().orElse(null) : null); - } - - @Override - public DefaultDittoDiagnosticLoggingAdapter setCorrelationId(@Nullable final CharSequence correlationId) { - return setMdcEntry(CommonMdcEntryKey.CORRELATION_ID, correlationId); + public DefaultDittoDiagnosticLoggingAdapter withCorrelationId(@Nullable final Map headers) { + return withMdcEntries(CommonMdcEntryKey.extractMdcEntriesFromHeaders(headers)); } @Override - public DefaultDittoDiagnosticLoggingAdapter setCorrelationId(final WithDittoHeaders withDittoHeaders) { - return setCorrelationId(checkNotNull(withDittoHeaders, "withDittoHeaders").getDittoHeaders()); + public DefaultDittoDiagnosticLoggingAdapter withCorrelationId(@Nullable final WithDittoHeaders withDittoHeaders) { + return withCorrelationId(null != withDittoHeaders ? withDittoHeaders.getDittoHeaders() : null); } @Override - public DefaultDittoDiagnosticLoggingAdapter setCorrelationId(final DittoHeaders dittoHeaders) { - checkNotNull(dittoHeaders, "dittoHeaders"); - return setCorrelationId(dittoHeaders.getCorrelationId().orElse(null)); + public DefaultDittoDiagnosticLoggingAdapter withCorrelationId(@Nullable final DittoHeaders dittoHeaders) { + return withCorrelationId(null != dittoHeaders ? dittoHeaders : Map.of()); } @Override @@ -150,30 +138,6 @@ private void removeFromMdcOfAllLoggerStates(final CharSequence key) { autoDiscardingLoggingAdapter.removeMdcEntry(key); } - @Override - public DefaultDittoDiagnosticLoggingAdapter setMdcEntry(final MdcEntry mdcEntry, - final Seq furtherMdcEntries) { - - currentLogger = loggingAdapter; - putToMdcOfAllLoggerStates(mdcEntry.getKey(), mdcEntry.getValueOrNull()); - final Collection furtherMdcEntriesCollection = JavaConverters.asJavaCollection(furtherMdcEntries); - furtherMdcEntriesCollection.forEach(furtherMdcEntry -> putToMdcOfAllLoggerStates(furtherMdcEntry.getKey(), - furtherMdcEntry.getValueOrNull())); - return this; - } - - @Override - public DefaultDittoDiagnosticLoggingAdapter setMdcEntry(final MdcEntry mdcEntry, - final MdcEntry... furtherMdcEntries) { - - currentLogger = loggingAdapter; - putToMdcOfAllLoggerStates(mdcEntry.getKey(), mdcEntry.getValueOrNull()); - for (final MdcEntry furtherMdcEntry : furtherMdcEntries) { - putToMdcOfAllLoggerStates(furtherMdcEntry.getKey(), furtherMdcEntry.getValueOrNull()); - } - return this; - } - @Override public DefaultDittoDiagnosticLoggingAdapter putMdcEntry(final CharSequence key, @Nullable final CharSequence value) { @@ -226,6 +190,15 @@ public DittoDiagnosticLoggingAdapter withMdcEntry(final MdcEntry mdcEntry, final return this; } + @Override + public DefaultDittoDiagnosticLoggingAdapter withMdcEntries(final Collection mdcEntries) { + checkNotNull(mdcEntries, "mdcEntries"); + + currentLogger = autoDiscardingLoggingAdapter; + mdcEntries.forEach(mdcEntry -> currentLogger.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull())); + return this; + } + @Override public DefaultDittoDiagnosticLoggingAdapter withMdcEntry(final MdcEntry mdcEntry, final Seq furtherMdcEntries) { diff --git a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoLogger.java b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoLogger.java index 8a6fff494b8..e6846da2dd0 100644 --- a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoLogger.java +++ b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoLogger.java @@ -14,6 +14,9 @@ import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; +import java.util.Collection; +import java.util.Map; + import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -60,6 +63,11 @@ public DefaultDittoLogger withCorrelationId(@Nullable final CharSequence correla return this; } + @Override + public DefaultDittoLogger withCorrelationId(@Nullable final Map headers) { + return withMdcEntries(CommonMdcEntryKey.extractMdcEntriesFromHeaders(headers)); + } + @Override public DefaultDittoLogger withCorrelationId(@Nullable final WithDittoHeaders withDittoHeaders) { return withCorrelationId(null != withDittoHeaders ? withDittoHeaders.getDittoHeaders() : null); @@ -67,7 +75,7 @@ public DefaultDittoLogger withCorrelationId(@Nullable final WithDittoHeaders wit @Override public DefaultDittoLogger withCorrelationId(@Nullable final DittoHeaders dittoHeaders) { - return withCorrelationId(null != dittoHeaders ? dittoHeaders.getCorrelationId().orElse(null) : null); + return withCorrelationId(null != dittoHeaders ? dittoHeaders : Map.of()); } @Override @@ -77,17 +85,6 @@ public DefaultDittoLogger setCorrelationId(@Nullable final CharSequence correlat return this; } - @Override - public DefaultDittoLogger setCorrelationId(final WithDittoHeaders withDittoHeaders) { - return setCorrelationId(checkNotNull(withDittoHeaders, "withDittoHeaders").getDittoHeaders()); - } - - @Override - public DefaultDittoLogger setCorrelationId(final DittoHeaders dittoHeaders) { - checkNotNull(dittoHeaders, "dittoHeaders"); - return setCorrelationId(dittoHeaders.getCorrelationId().orElse(null)); - } - @Override public void discardCorrelationId() { currentLogger.discardCorrelationId(); @@ -141,6 +138,15 @@ public DefaultDittoLogger withMdcEntry(final MdcEntry mdcEntry, final MdcEntry.. return this; } + @Override + public DefaultDittoLogger withMdcEntries(final Collection mdcEntries) { + checkNotNull(mdcEntries, "mdcEntries"); + + currentLogger = autoClosingSlf4jLogger; + mdcEntries.forEach(mdcEntry -> currentLogger.putMdcEntry(mdcEntry.getKey(), mdcEntry.getValueOrNull())); + return this; + } + @Override public DefaultDittoLogger removeMdcEntry(final CharSequence key) { currentLogger.removeMdcEntry(key); diff --git a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DittoLogger.java b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DittoLogger.java index ef1f97088b9..d8c0875fcb6 100644 --- a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DittoLogger.java +++ b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/DittoLogger.java @@ -12,6 +12,8 @@ */ package org.eclipse.ditto.internal.utils.akka.logging; +import java.util.Map; + import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -95,48 +97,28 @@ public interface DittoLogger extends Logger, WithMdcEntry { DittoLogger withCorrelationId(@Nullable CharSequence correlationId); /** - * Derives the correlation ID from the given WithDittoHeaders for the subsequent log operation. - * - * @param withDittoHeaders provides DittoHeaders which might contain the correlation ID to be put to the MDC. - * @return this DittoLogger instance to allow method chaining. - */ - DittoLogger withCorrelationId(@Nullable WithDittoHeaders withDittoHeaders); - - /** - * Obtains the correlation ID from the given DittoHeaders for the subsequent log operation. - * - * @param dittoHeaders might contain the correlation ID to be put to the MDC. - * @return this DittoLogger instance to allow method chaining. - */ - DittoLogger withCorrelationId(@Nullable DittoHeaders dittoHeaders); - - /** - * Sets the given correlation ID for all subsequent log operations until it gets manually discarded. + * Obtains the correlation ID from the given Headers for the subsequent log operation. * - * @param correlationId the correlation ID to be put to the MDC. + * @param headers might contain the correlation ID to be put to the MDC. * @return this DittoLogger instance to allow method chaining. */ - AutoCloseableSlf4jLogger setCorrelationId(@Nullable CharSequence correlationId); + DittoLogger withCorrelationId(@Nullable Map headers); /** - * Derives the correlation ID from the given WithDittoHeaders for all subsequent log operations until it gets - * manually discarded. + * Derives the correlation ID from the given WithDittoHeaders for the subsequent log operation. * * @param withDittoHeaders provides DittoHeaders which might contain the correlation ID to be put to the MDC. * @return this DittoLogger instance to allow method chaining. - * @throws NullPointerException if {@code withDittoHeaders} is {@code null}. */ - AutoCloseableSlf4jLogger setCorrelationId(WithDittoHeaders withDittoHeaders); + DittoLogger withCorrelationId(@Nullable WithDittoHeaders withDittoHeaders); /** - * Obtains the correlation ID from the given DittoHeaders for all subsequent log operations until it gets manually - * discarded. + * Obtains the correlation ID from the given DittoHeaders for the subsequent log operation. * * @param dittoHeaders might contain the correlation ID to be put to the MDC. * @return this DittoLogger instance to allow method chaining. - * @throws NullPointerException if {@code dittoHeaders} is {@code null}. */ - AutoCloseableSlf4jLogger setCorrelationId(DittoHeaders dittoHeaders); + DittoLogger withCorrelationId(@Nullable DittoHeaders dittoHeaders); /** * Removes the currently set correlation ID from the MDC. diff --git a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ImmutableDittoLogger.java b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ImmutableDittoLogger.java index 61f99b3a5ac..61c55be4523 100644 --- a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ImmutableDittoLogger.java +++ b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ImmutableDittoLogger.java @@ -15,6 +15,7 @@ import static org.eclipse.ditto.base.model.common.ConditionChecker.argumentNotEmpty; import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; +import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -363,6 +364,11 @@ public ImmutableDittoLogger withCorrelationId(@Nullable final CharSequence corre return withMdcEntry(CommonMdcEntryKey.CORRELATION_ID, correlationId); } + @Override + public ImmutableDittoLogger withCorrelationId(@Nullable final Map headers) { + return withMdcEntries(CommonMdcEntryKey.extractMdcEntriesFromHeaders(headers)); + } + @Override public ImmutableDittoLogger withCorrelationId(@Nullable final WithDittoHeaders withDittoHeaders) { return withCorrelationId(null != withDittoHeaders ? withDittoHeaders.getDittoHeaders() : null); @@ -370,7 +376,7 @@ public ImmutableDittoLogger withCorrelationId(@Nullable final WithDittoHeaders w @Override public ImmutableDittoLogger withCorrelationId(@Nullable final DittoHeaders dittoHeaders) { - return withCorrelationId(null != dittoHeaders ? dittoHeaders.getCorrelationId().orElse(null) : null); + return withCorrelationId(null != dittoHeaders ? dittoHeaders : Map.of()); } @Override @@ -467,6 +473,16 @@ public ImmutableDittoLogger withMdcEntry(final MdcEntry mdcEntry, final MdcEntry return new ImmutableDittoLogger(plainSlf4jLogger, newLocalMdc); } + @Override + public ImmutableDittoLogger withMdcEntries(final Collection mdcEntries) { + checkNotNull(mdcEntries, "mdcEntries"); + + final Map newLocalMdc = copyLocalMdc(); + mdcEntries.forEach(mdcEntry -> newLocalMdc.put(mdcEntry.getKey(), mdcEntry.getValueOrNull())); + + return new ImmutableDittoLogger(plainSlf4jLogger, newLocalMdc); + } + @Override public ImmutableDittoLogger removeMdcEntry(final CharSequence key) { validateMdcEntryKey(key, "key"); diff --git a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ImmutableDittoLoggingAdapter.java b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ImmutableDittoLoggingAdapter.java index e7862b571e6..1a76ade64e3 100644 --- a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ImmutableDittoLoggingAdapter.java +++ b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ImmutableDittoLoggingAdapter.java @@ -15,6 +15,7 @@ import static org.eclipse.ditto.base.model.common.ConditionChecker.argumentNotEmpty; import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.function.Supplier; @@ -75,14 +76,19 @@ public ImmutableDittoLoggingAdapter withCorrelationId(@Nullable final CharSequen return withMdcEntry(CommonMdcEntryKey.CORRELATION_ID, correlationId); } + @Override + public ImmutableDittoLoggingAdapter withCorrelationId(@Nullable final Map headers) { + return withMdcEntries(CommonMdcEntryKey.extractMdcEntriesFromHeaders(headers)); + } + @Override public ImmutableDittoLoggingAdapter withCorrelationId(@Nullable final WithDittoHeaders withDittoHeaders) { - return withCorrelationId(null != withDittoHeaders ? withDittoHeaders.getDittoHeaders() : null); + return withCorrelationId(withDittoHeaders != null ? withDittoHeaders.getDittoHeaders() : null); } @Override public ImmutableDittoLoggingAdapter withCorrelationId(@Nullable final DittoHeaders dittoHeaders) { - return withCorrelationId(null != dittoHeaders ? dittoHeaders.getCorrelationId().orElse(null) : null); + return withCorrelationId(null != dittoHeaders ? dittoHeaders : Map.of()); } @Override @@ -187,6 +193,16 @@ public ImmutableDittoLoggingAdapter withMdcEntry(final MdcEntry mdcEntry, final return newInstance(diagnosticLoggingAdapterFactory, mdcCopy); } + @Override + public ImmutableDittoLoggingAdapter withMdcEntries(final Collection mdcEntries) { + checkNotNull(mdcEntries, "mdcEntries"); + + final Map newLocalMdc = getCopyOfMdc(); + mdcEntries.forEach(mdcEntry -> newLocalMdc.put(mdcEntry.getKey(), mdcEntry.getValueOrNull())); + + return newInstance(diagnosticLoggingAdapterFactory, newLocalMdc); + } + @Override public ImmutableDittoLoggingAdapter withMdcEntry(final MdcEntry mdcEntry, final MdcEntry... furtherMdcEntries) { checkNotNull(mdcEntry, "mdcEntry"); diff --git a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ThreadSafeDittoLogger.java b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ThreadSafeDittoLogger.java index 3915f245349..35716e5f93b 100644 --- a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ThreadSafeDittoLogger.java +++ b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/ThreadSafeDittoLogger.java @@ -12,6 +12,8 @@ */ package org.eclipse.ditto.internal.utils.akka.logging; +import java.util.Map; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -76,6 +78,16 @@ public interface ThreadSafeDittoLogger extends Logger, WithMdcEntry headers); + /** * Derives the correlation ID from the given WithDittoHeaders for the log operations on the returned logger. * If no or an empty correlation ID can be derived, this method has the same effect like diff --git a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/WithMdcEntry.java b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/WithMdcEntry.java index 37a325e3884..0bc0821b10f 100644 --- a/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/WithMdcEntry.java +++ b/internal/utils/akka/src/main/java/org/eclipse/ditto/internal/utils/akka/logging/WithMdcEntry.java @@ -12,6 +12,8 @@ */ package org.eclipse.ditto.internal.utils.akka.logging; +import java.util.Collection; + import javax.annotation.Nullable; import org.slf4j.Logger; @@ -84,6 +86,14 @@ L withMdcEntries(CharSequence k1, @Nullable CharSequence v1, CharSequence k2, @N */ L withMdcEntry(MdcEntry mdcEntry, MdcEntry... furtherMdcEntries); + /** + * Puts the given entries to the MDC of this logger. + * + * @return this or a new logger instance for method chaining. + * @throws NullPointerException if any argument is {@code null}. + */ + L withMdcEntries(Collection mdcEntries); + /** * Removes the diagnostic context value identified by the specified key. * This method does nothing if there is no previous value associated with the specified key. diff --git a/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/DittoDiagnosticLoggingAdapter.scala b/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/DittoDiagnosticLoggingAdapter.scala index 397b909b7ed..fab8ab7b549 100644 --- a/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/DittoDiagnosticLoggingAdapter.scala +++ b/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/DittoDiagnosticLoggingAdapter.scala @@ -14,6 +14,7 @@ package org.eclipse.ditto.internal.utils.akka.logging import org.eclipse.ditto.base.model.headers.{DittoHeaders, WithDittoHeaders} +import java.util import javax.annotation.Nullable import javax.annotation.concurrent.NotThreadSafe import scala.annotation.varargs @@ -62,6 +63,13 @@ abstract class DittoDiagnosticLoggingAdapter extends AbstractDiagnosticLoggingAd */ def withCorrelationId(@Nullable correlationId: CharSequence): DittoDiagnosticLoggingAdapter + /** Obtains the correlation ID from the given headers for the subsequent log operation. + * + * @param headers might contain the correlation ID to be put to the MDC. + * @return this DittoLogger instance to allow method chaining. + */ + def withCorrelationId(@Nullable headers: util.Map[String, String]): DittoDiagnosticLoggingAdapter + /** Derives the correlation ID from the given WithDittoHeaders for the subsequent log operation. * * @param withDittoHeaders provides DittoHeaders which might contain the correlation ID to be put to the MDC. @@ -76,31 +84,6 @@ abstract class DittoDiagnosticLoggingAdapter extends AbstractDiagnosticLoggingAd */ def withCorrelationId(@Nullable dittoHeaders: DittoHeaders): DittoDiagnosticLoggingAdapter - /** Sets the given correlation ID for all subsequent log operations until it gets manually discarded. - * - * @param correlationId the correlation ID to be put to the MDC. - * @return this logger instance to allow method chaining. - */ - def setCorrelationId(@Nullable correlationId: CharSequence): DittoDiagnosticLoggingAdapter - - /** Derives the correlation ID from the given WithDittoHeaders for all subsequent log operations until it gets - * manually discarded. - * - * @param withDittoHeaders provides DittoHeaders which might contain the correlation ID to be put to the MDC. - * @return this logger instance to allow method chaining. - * @throws NullPointerException if `withDittoHeaders` is `null`. - */ - def setCorrelationId(withDittoHeaders: WithDittoHeaders): DittoDiagnosticLoggingAdapter - - /** Obtains the correlation ID from the given DittoHeaders for all subsequent log operations until it gets manually - * discarded. - * - * @param dittoHeaders might contain the correlation ID to be put to the MDC. - * @return this logger instance to allow method chaining. - * @throws NullPointerException if `dittoHeaders` is `null`. - */ - def setCorrelationId(dittoHeaders: DittoHeaders): DittoDiagnosticLoggingAdapter - /** Removes the correlation ID from the MDC for all subsequent log operations. */ def discardCorrelationId(): Unit @@ -164,14 +147,6 @@ abstract class DittoDiagnosticLoggingAdapter extends AbstractDiagnosticLoggingAd */ def discardMdcEntry(key: CharSequence): Unit - /** Sets the specified diagnostic context values as identified by the specified keys to this logger's MDC for all - * subsequent log operations until it gets manually discarded. - * - * @return this logger instance to allow method chaining. - * @throws NullPointerException if any argument is `null`. - */ - @annotation.varargs def setMdcEntry(mdcEntry: MdcEntry, furtherMdcEntries: MdcEntry*): DittoDiagnosticLoggingAdapter - /** Message template with > 4 replacement arguments. */ @varargs def error(throwable: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any, moreArgs: Any*): Unit = { if (isErrorEnabled) { diff --git a/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/MdcEntrySettable.scala b/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/MdcEntrySettable.scala index 93d6adfa2bb..960739b263d 100644 --- a/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/MdcEntrySettable.scala +++ b/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/MdcEntrySettable.scala @@ -12,6 +12,7 @@ */ package org.eclipse.ditto.internal.utils.akka.logging +import java.util import javax.annotation.Nullable /** This trait defines the means to put and remove entries to or from the MDC of a logger. @@ -78,6 +79,14 @@ trait MdcEntrySettable[L] { */ @annotation.varargs def withMdcEntry(mdcEntry: MdcEntry, furtherMdcEntries: MdcEntry*): L + /** Puts the given entries to the MDC of this logger. + * + * @param mdcEntries the MDC entries to set. + * @return this or a new logger instance for method chaining. + * @throws NullPointerException if any argument is `null`. + */ + def withMdcEntries(mdcEntries: util.Collection[MdcEntry]): L + /** Removes the diagnostic context value identified by the specified key. * This method does nothing if there is no previous value associated with the specified key. * diff --git a/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/ThreadSafeDittoLoggingAdapter.scala b/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/ThreadSafeDittoLoggingAdapter.scala index 9dc8631431a..5520f311c61 100644 --- a/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/ThreadSafeDittoLoggingAdapter.scala +++ b/internal/utils/akka/src/main/scala/org/eclipse/ditto/internal/utils/akka/logging/ThreadSafeDittoLoggingAdapter.scala @@ -15,6 +15,7 @@ package org.eclipse.ditto.internal.utils.akka.logging import akka.event.LoggingAdapter import org.eclipse.ditto.base.model.headers.{DittoHeaders, WithDittoHeaders} +import java.util import javax.annotation.Nullable import javax.annotation.concurrent.ThreadSafe @@ -77,6 +78,14 @@ abstract class ThreadSafeDittoLoggingAdapter extends LoggingAdapter */ def withCorrelationId(@Nullable correlationId: CharSequence): ThreadSafeDittoLoggingAdapter + /** Obtains the correlation ID from the given headers for the log operations on the returned logger. + * + * @param headers might contain the correlation ID to be put to the MDC. + * @return a ThreadSafeDittoLoggingAdapter which appends the derived correlation ID to all of its log operations. + * @see #withCorrelationId(DittoHeaders) + */ + def withCorrelationId(@Nullable headers: util.Map[String, String]): ThreadSafeDittoLoggingAdapter + /** Derives the correlation ID from the given WithDittoHeaders for the log operations on the returned logger. * If no or an empty correlation ID can be derived, this method has the same effect like * [[ThreadSafeDittoLoggingAdapter discardCorrelationId()]]. diff --git a/internal/utils/akka/src/test/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultAutoCloseableSlf4jLoggerTest.java b/internal/utils/akka/src/test/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultAutoCloseableSlf4jLoggerTest.java index ded7f5666aa..175bf96a0f8 100644 --- a/internal/utils/akka/src/test/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultAutoCloseableSlf4jLoggerTest.java +++ b/internal/utils/akka/src/test/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultAutoCloseableSlf4jLoggerTest.java @@ -132,7 +132,7 @@ public void logDebugAndWarnWithTwoMdcValuesThenClose() { entry(CORRELATION_ID_KEY, correlationId), entry(CONNECTION_ID_KEY, connectionId)); softly.assertThat(mdcObserver.getAllRemovedKeys()) .as("Removed MDC entries") - .containsExactly(CORRELATION_ID_KEY, CONNECTION_ID_KEY); + .contains(CORRELATION_ID_KEY, CONNECTION_ID_KEY); } @Test diff --git a/internal/utils/akka/src/test/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoDiagnosticLoggingAdapterTest.java b/internal/utils/akka/src/test/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoDiagnosticLoggingAdapterTest.java index 2f6c3c439bd..9b71f461810 100644 --- a/internal/utils/akka/src/test/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoDiagnosticLoggingAdapterTest.java +++ b/internal/utils/akka/src/test/java/org/eclipse/ditto/internal/utils/akka/logging/DefaultDittoDiagnosticLoggingAdapterTest.java @@ -170,20 +170,6 @@ public void withTwoMdcEntriesLogWarning() { Mockito.verify(plainLoggingAdapter).setMDC(Map.of()); } - @Test - public void putNothingToMdcAndDoNotLogAsInfoIsDisabled() { - final String msg = "Foo!"; - Mockito.when(plainLoggingAdapter.isInfoEnabled()).thenReturn(false); - - final DefaultDittoDiagnosticLoggingAdapter underTest = - DefaultDittoDiagnosticLoggingAdapter.of(plainLoggingAdapter, LOGGER_NAME); - underTest.setCorrelationId(getCorrelationId()); - underTest.info(msg); - - Mockito.verify(plainLoggingAdapter, Mockito.times(0)).info(msg); - Mockito.verify(plainLoggingAdapter, Mockito.times(0)).setMDC(Mockito.anyMap()); - } - @Test public void logDebugAndWarnWithTwoMdcValuesThenDiscardMdcEntries() { final String correlationId = getCorrelationId(); @@ -246,62 +232,6 @@ public void removeMdcEntryViaNullValue() { Mockito.verify(plainLoggingAdapter).setMDC(Map.of(CONNECTION_ID_KEY, connectionId)); } - @Test - public void setCorrelationIdLogErrorDoNotDiscard() { - final String correlationId = getCorrelationId(); - final String msg = "Foo!"; - Mockito.when(plainLoggingAdapter.isErrorEnabled()).thenReturn(true); - - final DefaultDittoDiagnosticLoggingAdapter underTest = - DefaultDittoDiagnosticLoggingAdapter.of(plainLoggingAdapter, LOGGER_NAME); - underTest.setCorrelationId(correlationId); - underTest.error(msg); - - Mockito.verify(plainLoggingAdapter).isErrorEnabled(); - Mockito.verify(plainLoggingAdapter).getMDC(); - Mockito.verify(plainLoggingAdapter).setMDC(Map.of(CORRELATION_ID_KEY, correlationId)); - Mockito.verify(plainLoggingAdapter).notifyError(msg); - Mockito.verifyNoMoreInteractions(plainLoggingAdapter); - } - - @Test - public void setCorrelationIdLogErrorDoNotClose() { - final String correlationId = getCorrelationId(); - final String msg = "Foo!"; - Mockito.when(plainLoggingAdapter.isErrorEnabled()).thenReturn(true); - - final DefaultDittoDiagnosticLoggingAdapter underTest = - DefaultDittoDiagnosticLoggingAdapter.of(plainLoggingAdapter, LOGGER_NAME); - underTest.setCorrelationId(correlationId); - underTest.error(msg); - - Mockito.verify(plainLoggingAdapter).isErrorEnabled(); - Mockito.verify(plainLoggingAdapter).getMDC(); - Mockito.verify(plainLoggingAdapter).setMDC(Map.of(CORRELATION_ID_KEY, correlationId)); - Mockito.verify(plainLoggingAdapter).notifyError(msg); - Mockito.verifyNoMoreInteractions(plainLoggingAdapter); - } - - @Test - public void setCorrelationIdLogInfoThenDiscardCorrelationId() { - final String correlationId = getCorrelationId(); - final String msg1 = "Foo!"; - final String msg2 = "No correlation ID in MDC."; - Mockito.when(plainLoggingAdapter.isInfoEnabled()).thenReturn(true); - - final DefaultDittoDiagnosticLoggingAdapter underTest = - DefaultDittoDiagnosticLoggingAdapter.of(plainLoggingAdapter, LOGGER_NAME); - underTest.setCorrelationId(correlationId); - underTest.info(msg1); - underTest.discardCorrelationId(); - underTest.info(msg2); - - Mockito.verify(plainLoggingAdapter).setMDC(Map.of(CORRELATION_ID_KEY, correlationId)); - Mockito.verify(plainLoggingAdapter).notifyInfo(msg1); - Mockito.verify(plainLoggingAdapter).setMDC(Map.of()); - Mockito.verify(plainLoggingAdapter).notifyInfo(msg2); - } - @Test public void logMoreThan4LoggingArgsError() { final String template = "one: {}, two: {}, three: {}, four: {}, five: {}, six: {}"; diff --git a/internal/utils/persistence/src/test/resources/logback-test.xml b/internal/utils/persistence/src/test/resources/logback-test.xml index bb7c75a0cc8..ef9ac38703c 100644 --- a/internal/utils/persistence/src/test/resources/logback-test.xml +++ b/internal/utils/persistence/src/test/resources/logback-test.xml @@ -5,14 +5,14 @@ - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}] %logger{20} %X{akkaSource} - %msg%n System.err - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}] %logger{20} %X{akkaSource} - %msg%n ERROR diff --git a/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/span/KamonHttpContextPropagation.java b/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/span/KamonHttpContextPropagation.java index 9fac417620b..aa68cf1c965 100644 --- a/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/span/KamonHttpContextPropagation.java +++ b/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/span/KamonHttpContextPropagation.java @@ -99,7 +99,7 @@ public Context getContextFromHeaders(final Map headers) { public Map propagateContextToHeaders(final Context context, final Map headers) { checkNotNull(context, "context"); final var result = getMutableCopyOfMap(checkNotNull(headers, "headers")); - propagation.write(context, result::put); + propagation.write(context, result::putIfAbsent); return result; } diff --git a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/announcements/PolicyAnnouncementAcknowledgementAggregatorActor.java b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/announcements/PolicyAnnouncementAcknowledgementAggregatorActor.java index 183df517dd1..c0970445655 100644 --- a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/announcements/PolicyAnnouncementAcknowledgementAggregatorActor.java +++ b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/announcements/PolicyAnnouncementAcknowledgementAggregatorActor.java @@ -64,7 +64,7 @@ private PolicyAnnouncementAcknowledgementAggregatorActor(final PolicyAnnouncemen aggregator = AcknowledgementAggregator.getInstance(policyAnnouncement.getEntityId(), correlationId, timeout, HeaderTranslator.empty()); aggregator.addAcknowledgementRequests(acknowledgementRequests); - log.withCorrelationId(correlationId) + log.withCorrelationId(dittoHeaders) .info("Starting to wait for all requested acknowledgements <{}> for a maximum duration of <{}>.", acknowledgementRequests, timeout); } diff --git a/policies/service/src/main/resources/logback.xml b/policies/service/src/main/resources/logback.xml index 4f1fcea100a..b137aefb91d 100755 --- a/policies/service/src/main/resources/logback.xml +++ b/policies/service/src/main/resources/logback.xml @@ -16,14 +16,14 @@ - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n System.err - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n ERROR diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/PolicyIdReferencePlaceholderResolver.java b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/PolicyIdReferencePlaceholderResolver.java index e4a0e63c6de..4c74b4bd53a 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/PolicyIdReferencePlaceholderResolver.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/PolicyIdReferencePlaceholderResolver.java @@ -23,7 +23,6 @@ import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.internal.utils.akka.logging.AutoCloseableSlf4jLogger; import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger; import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetry; @@ -86,15 +85,14 @@ public CompletionStage resolve(final ReferencePlaceholder referencePlace final var resolveEntityReferenceStrategy = supportedEntityTypesToActionMap.get(referencePlaceholder.getReferencedEntityType()); - try (final AutoCloseableSlf4jLogger logger = LOGGER.setCorrelationId(dittoHeaders)) { - if (null == resolveEntityReferenceStrategy) { - final String referencedEntityType = referencePlaceholder.getReferencedEntityType().name(); - logger.info("Could not find a placeholder replacement strategy for entity type <{}> in supported" + - " entity types: {}", referencedEntityType, supportedEntityTypeNames); - throw notSupportedException(referencedEntityType, dittoHeaders); - } - logger.debug("Will resolve entity reference for placeholder: <{}>", referencePlaceholder); + final DittoLogger logger = LOGGER.withCorrelationId(dittoHeaders); + if (null == resolveEntityReferenceStrategy) { + final String referencedEntityType = referencePlaceholder.getReferencedEntityType().name(); + logger.info("Could not find a placeholder replacement strategy for entity type <{}> in supported" + + " entity types: {}", referencedEntityType, supportedEntityTypeNames); + throw notSupportedException(referencedEntityType, dittoHeaders); } + logger.debug("Will resolve entity reference for placeholder: <{}>", referencePlaceholder); return resolveEntityReferenceStrategy.handleEntityPolicyIdReference(referencePlaceholder, dittoHeaders); } diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java index f6d3bc955e8..92f51c81a2c 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java @@ -232,7 +232,7 @@ private CompletionStage> loadPolicyEnforcerForCreateThi return invalidatable.invalidate(PolicyTag.of(policy.getEntityId().get(), policy.getRevision().get().toLong()), correlationId, askWithRetryConfig.getAskTimeout()) .thenApply(bool -> { - log.withCorrelationId(correlationId) + log.withCorrelationId(createThing) .debug("PolicyEnforcerCache invalidated. Previous entity was present: {}", bool); return policy; diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/SupervisorInlinePolicyEnrichment.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/SupervisorInlinePolicyEnrichment.java index b2879f6ca7a..cecb903753e 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/SupervisorInlinePolicyEnrichment.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/SupervisorInlinePolicyEnrichment.java @@ -18,8 +18,6 @@ import java.util.UUID; import java.util.concurrent.CompletionStage; -import javax.annotation.Nullable; - import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; @@ -189,21 +187,20 @@ private CompletionStage> retrieveInlinedPolicyF if (response instanceof RetrievePolicyResponse retrievePolicyResponse) { return Optional.of(retrievePolicyResponse); } else { - log.withCorrelationId(getCorrelationIdOrNull(response, retrievePolicy)) + log.withCorrelationId(getEffectiveHeaders(response, retrievePolicy)) .info("No authorized response when retrieving inlined policy <{}> for thing <{}>: {}", retrievePolicy.getEntityId(), thingId, response); return Optional.empty(); } } ).exceptionally(error -> { - log.withCorrelationId(getCorrelationIdOrNull(error, retrievePolicy)) + log.withCorrelationId(getEffectiveHeaders(error, retrievePolicy)) .error(error, "Retrieving inlined policy after RetrieveThing"); return Optional.empty(); }); } - @Nullable - private static CharSequence getCorrelationIdOrNull(final Object signal, final WithDittoHeaders fallBackSignal) { + private static DittoHeaders getEffectiveHeaders(final Object signal, final WithDittoHeaders fallBackSignal) { final WithDittoHeaders withDittoHeaders; if (isWithDittoHeaders(signal)) { @@ -211,8 +208,7 @@ private static CharSequence getCorrelationIdOrNull(final Object signal, final Wi } else { withDittoHeaders = fallBackSignal; } - final var dittoHeaders = withDittoHeaders.getDittoHeaders(); - return dittoHeaders.getCorrelationId().orElse(null); + return withDittoHeaders.getDittoHeaders(); } private static boolean isWithDittoHeaders(final Object o) { diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java index a9f3ec25dcd..4df4747cd8b 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java @@ -355,7 +355,7 @@ private void handleRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) { final String correlationId = rollback.initialCommand().getDittoHeaders().getCorrelationId() .orElse("unexpected:" + UUID.randomUUID()); if (policyCreatedEvent != null) { - log.withCorrelationId(correlationId) + log.withCorrelationId(rollback.initialCommand()) .warning("Rolling back created policy as consequence of received RollbackCreatedPolicy " + "message: {}", rollback); final DittoHeaders dittoHeaders = DittoHeaders.newBuilder() @@ -366,19 +366,20 @@ private void handleRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) { AskWithRetry.askWithRetry(policiesShardRegion, deletePolicy, enforcementConfig.getAskWithRetryConfig(), getContext().system(), response -> { - log.withCorrelationId(correlationId) + log.withCorrelationId(rollback.initialCommand()) .info("Policy <{}> deleted after rolling back it's creation. " + "Policies shard region response: <{}>", deletePolicy.getEntityId(), response); rollback.completeInitialResponse(); return response; }).exceptionally(throwable -> { - log.withCorrelationId(correlationId).error(throwable, "Failed to rollback Policy Create"); + log.withCorrelationId(rollback.initialCommand()) + .error(throwable, "Failed to rollback Policy Create"); rollback.completeInitialResponse(); return null; }); } else { - log.withCorrelationId(correlationId) + log.withCorrelationId(rollback.initialCommand()) .debug("Not initiating policy rollback as none was created."); rollback.completeInitialResponse(); } diff --git a/things/service/src/main/resources/logback.xml b/things/service/src/main/resources/logback.xml index cc7773e5b98..45890fd090e 100755 --- a/things/service/src/main/resources/logback.xml +++ b/things/service/src/main/resources/logback.xml @@ -16,14 +16,14 @@ - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n System.err - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n ERROR diff --git a/things/service/src/test/resources/logback-test.xml b/things/service/src/test/resources/logback-test.xml index 129c7339d07..8f07693a117 100644 --- a/things/service/src/test/resources/logback-test.xml +++ b/things/service/src/test/resources/logback-test.xml @@ -20,7 +20,7 @@ - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20‚} - %msg%n%rEx + %date{ISO8601} %-5level [%X{correlation-id}] %logger{20‚} - %msg%n%rEx diff --git a/thingsearch/service/src/main/resources/logback.xml b/thingsearch/service/src/main/resources/logback.xml index 14376de315d..b801d8bc6b5 100755 --- a/thingsearch/service/src/main/resources/logback.xml +++ b/thingsearch/service/src/main/resources/logback.xml @@ -16,14 +16,14 @@ - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n System.err - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n ERROR diff --git a/thingsearch/service/src/test/resources/logback-test.xml b/thingsearch/service/src/test/resources/logback-test.xml index 820e0308392..da7c2d5a1df 100644 --- a/thingsearch/service/src/test/resources/logback-test.xml +++ b/thingsearch/service/src/test/resources/logback-test.xml @@ -17,14 +17,14 @@ - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}] %logger{20} %X{akkaSource} - %msg%n System.err - %date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n + %date{ISO8601} %-5level [%X{correlation-id}] %logger{20} %X{akkaSource} - %msg%n ERROR