Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
#1739 provide the traceparent header as MDC value in logs
Browse files Browse the repository at this point in the history
* generify the fields to provide to the MDC in CommonMdcEntryKey enum
* rename "x-correlation-id" in logs to just "correlation-id"
* exchange some places where only correlationId was extracted from a map of headers with parsing all the headers for MDC worthy fields

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
thjaeckle committed Sep 11, 2023
1 parent b7315c6 commit 3cd2513
Showing 55 changed files with 299 additions and 288 deletions.
Original file line number Diff line number Diff line change
@@ -522,8 +522,7 @@ private DevOpsCommandResponseCorrelationActor(final ActorRef devOpsCommandSender
final var dittoHeaders = devOpsCommand.getDittoHeaders();
aggregateResults = isAggregateResults(dittoHeaders);
this.expectedResponses = expectedResponses;
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
logger.setCorrelationId(dittoHeaders);
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this).withCorrelationId(dittoHeaders);
}

private static boolean isAggregateResults(final DittoHeaders dittoHeaders) {
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
@@ -314,11 +313,10 @@ private Stream<SendingOrDropped> sendMappedOutboundSignal(final OutboundSignal.M
final int maxPayloadBytesForSignal) {

final var message = outbound.getExternalMessage();
final String correlationId = message.getHeaders().get(CORRELATION_ID.getKey());
final Signal<?> outboundSource = outbound.getSource();
final List<Target> outboundTargets = outbound.getTargets();

final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(correlationId);
final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(message.getHeaders());
final Optional<SendingContext> replyTargetSendingContext = getSendingContext(outbound);

final List<SendingContext> sendingContexts = replyTargetSendingContext
Original file line number Diff line number Diff line change
@@ -329,11 +329,8 @@ public Optional<Signal<?>> onMapped(final String mapperId,

@Override
public Optional<Signal<?>> onDropped(final String mapperId, @Nullable final ExternalMessage incomingMessage) {
logger.withCorrelationId(Optional.ofNullable(incomingMessage)
.map(ExternalMessage::getHeaders)
.map(h -> h.get(DittoHeaderDefinition.CORRELATION_ID.getKey()))
.orElse(null)
).debug("Message mapping returned null, message is dropped.");
logger.withCorrelationId(incomingMessage != null ? incomingMessage.getHeaders() : null)
.debug("Message mapping returned null, message is dropped.");
if (incomingMessage != null) {
final String source = getAddress(incomingMessage);
final ConnectionMonitor.InfoProvider infoProvider = InfoProviderFactory.forExternalMessage(incomingMessage);
Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ static InboundMappingProcessor of(final Connection connection,
@Override
List<MappingOutcome<MappedInboundExternalMessage>> process(final ExternalMessage message) {
final var mappers = getMappers(message.getPayloadMapping().orElse(null));
logger.withCorrelationId(message.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()))
logger.withCorrelationId(message.getHeaders())
.debug("Mappers resolved for message: {}", mappers);
final var mappingTimer = MappingTimer.inbound(connectionId, connectionType, message.getHeaders());
return mappingTimer.overall(() -> mappers.stream()
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
@@ -169,14 +168,12 @@ private InboundMappingOutcomes mapInboundMessage(final ExternalMessageWithSender
final InboundMappingProcessor inboundMappingProcessor) {

final var externalMessage = withSender.externalMessage();
@Nullable final var correlationId =
externalMessage.findHeaderIgnoreCase(DittoHeaderDefinition.CORRELATION_ID.getKey()).orElse(null);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(externalMessage.getHeaders())
.debug("Handling ExternalMessage: {}", externalMessage);
try {
return mapExternalMessageToSignal(withSender, inboundMappingProcessor);
} catch (final Exception e) {
logger.withCorrelationId(correlationId)
logger.withCorrelationId(externalMessage.getHeaders())
.error("Handling exception when mapping external message: {}", e.getMessage());
return InboundMappingOutcomes.of(withSender.externalMessage(), e, withSender.sender());
}
Original file line number Diff line number Diff line change
@@ -13,8 +13,9 @@
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
@@ -29,7 +30,6 @@
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
@@ -217,10 +217,9 @@ private void updateSendMonitor(@Nullable final Exception exception) {
dittoRuntimeException.getClass().getSimpleName(), dittoRuntimeException.getMessage());
} else {
publishedMonitor.exception(message, exception);
final DittoHeaders internalHeaders = message.getInternalHeaders();
@Nullable final String correlationId = internalHeaders.getCorrelationId()
.orElseGet(() -> message.findHeaderIgnoreCase(CORRELATION_ID.getKey()).orElse(null));
logger.withCorrelationId(correlationId)
final Map<String, String> combinedHeaders = new HashMap<>(message.getHeaders());
combinedHeaders.putAll(message.getInternalHeaders());
logger.withCorrelationId(combinedHeaders)
.info("Unexpected failure when publishing signal - {}: {}",
exception.getClass().getSimpleName(), exception.getMessage());
}
Original file line number Diff line number Diff line change
@@ -365,7 +365,7 @@ private void handleJmsMessage(final JmsMessage message) {
.start();
headers = startedSpan.propagateContext(headers);
final ExternalMessageBuilder builder = ExternalMessageFactory.newExternalMessageBuilder(headers);
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder, correlationId)
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder)
.withAuthorizationContext(source.getAuthorizationContext())
.withEnforcement(headerEnforcementFilterFactory.getFilter(headers))
.withHeaderMapping(source.getHeaderMapping())
@@ -374,10 +374,10 @@ private void handleJmsMessage(final JmsMessage message) {
.build();
inboundMonitor.success(externalMessage);
final Map<String, String> externalMessageHeaders = externalMessage.getHeaders();
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headers)
.info("Received message from AMQP 1.0 with externalMessageHeaders: {}", externalMessageHeaders);
if (logger.isDebugEnabled()) {
logger.withCorrelationId(correlationId).debug("Received message from AMQP 1.0 with payload: {}",
logger.withCorrelationId(headers).debug("Received message from AMQP 1.0 with payload: {}",
externalMessage.getTextPayload().orElse("binary"));
}
forwardToMapping(externalMessage,
@@ -404,7 +404,7 @@ private void handleJmsMessage(final JmsMessage message) {
inboundMonitor.exception(e);
}
startedSpan.tagAsFailed(e);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headers)
.error(e, "Unexpected {}: {}", e.getClass().getName(), e.getMessage());
} finally {
startedSpan.finish();
@@ -422,8 +422,6 @@ private void handleJmsMessage(final JmsMessage message) {
private void acknowledge(final JmsMessage message, final boolean isSuccess, final boolean redeliver,
final Map<String, String> externalMessageHeaders) {

final Optional<String> correlationId = Optional.ofNullable(
externalMessageHeaders.get(DittoHeaderDefinition.CORRELATION_ID.getKey()));
try {
final String messageId = message.getJMSMessageID();
recordAckForRateLimit(messageId, isSuccess, redeliver);
@@ -437,8 +435,7 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
ackType = redeliver ? MODIFIED_FAILED : REJECTED;
ackTypeName = redeliver ? "modified[delivery-failed]" : "rejected";
}
final String jmsCorrelationID = message.getJMSCorrelationID();
logger.withCorrelationId(correlationId.orElse(jmsCorrelationID))
logger.withCorrelationId(externalMessageHeaders)
.info(MessageFormat.format(
"Acking <{0}> with original external message headers=<{1}>, isSuccess=<{2}>, ackType=<{3} {4}>",
messageId,
@@ -457,15 +454,15 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
"Sending negative acknowledgement: <{0}>", ackTypeName);
}
} catch (final IllegalStateException e) {
logger.withCorrelationId(correlationId.orElse(null))
logger.withCorrelationId(externalMessageHeaders)
.warning(e, "Failed to ack an AMQP message because of server side issues");
} catch (final Exception e) {
logger.withCorrelationId(correlationId.orElse(null)).error(e, "Failed to ack an AMQP message");
logger.withCorrelationId(externalMessageHeaders).error(e, "Failed to ack an AMQP message");
}
}

private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage message,
final ExternalMessageBuilder builder, @Nullable final String correlationId) throws JMSException {
final ExternalMessageBuilder builder) throws JMSException {
if (message instanceof TextMessage textMessage) {
final String payload = textMessage.getText();
if (payload == null) {
@@ -487,7 +484,7 @@ private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage messag
if (logger.isDebugEnabled()) {
final Destination destination = message.getJMSDestination();
final Map<String, String> headersMapFromJmsMessage = extractHeadersMapFromJmsMessage(message);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headersMapFromJmsMessage)
.debug("Received message at '{}' of unsupported type ({}) with headers: {}",
destination, message.getClass().getName(), headersMapFromJmsMessage);
}
Original file line number Diff line number Diff line change
@@ -117,7 +117,7 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
try {
final String key = consumerRecord.key();
final ByteBuffer value = consumerRecord.value();
final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(correlationId);
final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(messageHeaders);
correlationIdScopedLogger.debug(
"Transforming incoming kafka message <{}> with headers <{}> and key <{}>.",
value, messageHeaders, key
@@ -148,7 +148,7 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
return TransformationResult.failed(e.setDittoHeaders(DittoHeaders.of(messageHeaders)));
} catch (final Exception e) {
inboundMonitor.exception(messageHeaders, e);
LOGGER.withCorrelationId(correlationId)
LOGGER.withCorrelationId(messageHeaders)
.error(String.format("Unexpected {%s}: {%s}", e.getClass().getName(), e.getMessage()), e);
startedSpan.tagAsFailed(e);
return null; // Drop message
4 changes: 2 additions & 2 deletions connectivity/service/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -16,14 +16,14 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>

<appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
<target>System.err</target>
<encoder>
<pattern>%date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
Original file line number Diff line number Diff line change
@@ -254,6 +254,7 @@ protected Sink<Object, NotUsed> setupInboundMappingSink(final ActorRef clientAct
.thenReturn(logger);
when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class)))
.thenReturn(logger);
when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger);
final ProtocolAdapter protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null);
final var connection = CONNECTION.toBuilder().payloadMappingDefinition(payloadMappingDefinition).build();
final InboundMappingProcessor inboundMappingProcessor =
Original file line number Diff line number Diff line change
@@ -349,6 +349,7 @@ ActorRef createInboundMappingProcessorActor(final ActorRef proxyActor,
final var logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
Mockito.when(logger.withCorrelationId(Mockito.any(DittoHeaders.class)))
.thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.nullable(CharSequence.class)))
.thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class)))
Original file line number Diff line number Diff line change
@@ -125,6 +125,7 @@ private static InboundMappingProcessor createThrowingProcessor() {
final ThreadSafeDittoLoggingAdapter logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<CharSequence>any());
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<DittoHeaders>any());
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<Map>any());
final Connection connection =
TestConstants.createConnection(ConnectionId.of("connectionId"), ConnectionType.MQTT);
return InboundMappingProcessor.of(connection,
Original file line number Diff line number Diff line change
@@ -112,6 +112,7 @@ public static void setUp() {
when(logger.withCorrelationId(Mockito.nullable(String.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(WithDittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(DittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(Map.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger);

protocolAdapterProvider = new DittoProtocolAdapterProvider(Mockito.mock(ProtocolConfig.class));
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.withSettings;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -92,8 +93,8 @@ public void setUp() {
.build();

Mockito.when(externalMessage.getInternalHeaders()).thenReturn(dittoHeaders);
Mockito.when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class))).thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger);

exceptionConverter = DefaultExceptionToAcknowledgementConverter.getInstance();
}
@@ -313,7 +314,7 @@ public void monitorAndAcknowledgeWhenFutureResponseTerminatedExceptionallyAndNoA

Mockito.verifyNoInteractions(acknowledgedMonitor);
Mockito.verify(publishedMonitor).exception(eq(externalMessage), eq(rootCause));
Mockito.verify(logger).withCorrelationId(testName.getMethodName());
Mockito.verify(logger).withCorrelationId(Map.of("correlation-id", testName.getMethodName()));
assertThat(result).hasValueSatisfying(resultFuture -> assertThat(resultFuture).isCompletedWithValue(null));
}

Original file line number Diff line number Diff line change
@@ -245,6 +245,7 @@ public static ThreadSafeDittoLoggingAdapter mockThreadSafeDittoLoggingAdapter()
when(logger.withCorrelationId(Mockito.nullable(String.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(WithDittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(DittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(Map.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger);
return logger;
}
Original file line number Diff line number Diff line change
@@ -360,6 +360,7 @@ private Sink<Object, NotUsed> setupMappingSink(final ActorRef testRef,
final ThreadSafeDittoLoggingAdapter logger = mock(ThreadSafeDittoLoggingAdapter.class);
when(logger.withCorrelationId(any(DittoHeaders.class)))
.thenReturn(logger);
when(logger.withCorrelationId(any(Map.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(CharSequence.class)))
.thenReturn(logger);
when(logger.withCorrelationId(any(WithDittoHeaders.class)))
2 changes: 1 addition & 1 deletion deployment/helm/ditto/Chart.yaml
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ description: |
A digital twin is a virtual, cloud based, representation of his real world counterpart
(real world “Things”, e.g. devices like sensors, smart heating, connected cars, smart grids, EV charging stations etc).
type: application
version: 3.3.9 # chart version is effectively set by release-job
version: 3.3.10 # chart version is effectively set by release-job
appVersion: 3.3.6
keywords:
- iot-chart
1 change: 0 additions & 1 deletion deployment/helm/ditto/logback-config/connectivity.xml
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@
<excludeMdcKeyName>sourceActorSystem</excludeMdcKeyName>
<excludeMdcKeyName>akkaUid</excludeMdcKeyName>
<excludeMdcKeyName>akkaTimestamp</excludeMdcKeyName>
<mdcKeyFieldName>x-correlation-id=correlation-id</mdcKeyFieldName>
<mdcKeyFieldName>connection-id=ditto-connection-id</mdcKeyFieldName>
<mdcKeyFieldName>connection-type=ditto-connection-type</mdcKeyFieldName>
</encoder>
1 change: 0 additions & 1 deletion deployment/helm/ditto/logback-config/gateway.xml
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@
<excludeMdcKeyName>sourceActorSystem</excludeMdcKeyName>
<excludeMdcKeyName>akkaUid</excludeMdcKeyName>
<excludeMdcKeyName>akkaTimestamp</excludeMdcKeyName>
<mdcKeyFieldName>x-correlation-id=correlation-id</mdcKeyFieldName>
</encoder>
</appender>

Loading

0 comments on commit 3cd2513

Please sign in to comment.