Skip to content

Commit

Permalink
#1739 provide the traceparent header as MDC value in logs
Browse files Browse the repository at this point in the history
* generify the fields to provide to the MDC in CommonMdcEntryKey enum
* rename "x-correlation-id" in logs to just "correlation-id"
* exchange some places where only correlationId was extracted from a map of headers with parsing all the headers for MDC worthy fields
  • Loading branch information
thjaeckle committed Sep 8, 2023
1 parent b7315c6 commit 3770ebc
Show file tree
Hide file tree
Showing 47 changed files with 287 additions and 285 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,7 @@ private DevOpsCommandResponseCorrelationActor(final ActorRef devOpsCommandSender
final var dittoHeaders = devOpsCommand.getDittoHeaders();
aggregateResults = isAggregateResults(dittoHeaders);
this.expectedResponses = expectedResponses;
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
logger.setCorrelationId(dittoHeaders);
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this).withCorrelationId(dittoHeaders);
}

private static boolean isAggregateResults(final DittoHeaders dittoHeaders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -314,11 +313,10 @@ private Stream<SendingOrDropped> sendMappedOutboundSignal(final OutboundSignal.M
final int maxPayloadBytesForSignal) {

final var message = outbound.getExternalMessage();
final String correlationId = message.getHeaders().get(CORRELATION_ID.getKey());
final Signal<?> outboundSource = outbound.getSource();
final List<Target> outboundTargets = outbound.getTargets();

final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(correlationId);
final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(message.getHeaders());
final Optional<SendingContext> replyTargetSendingContext = getSendingContext(outbound);

final List<SendingContext> sendingContexts = replyTargetSendingContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,8 @@ public Optional<Signal<?>> onMapped(final String mapperId,

@Override
public Optional<Signal<?>> onDropped(final String mapperId, @Nullable final ExternalMessage incomingMessage) {
logger.withCorrelationId(Optional.ofNullable(incomingMessage)
.map(ExternalMessage::getHeaders)
.map(h -> h.get(DittoHeaderDefinition.CORRELATION_ID.getKey()))
.orElse(null)
).debug("Message mapping returned null, message is dropped.");
logger.withCorrelationId(incomingMessage != null ? incomingMessage.getHeaders() : null)
.debug("Message mapping returned null, message is dropped.");
if (incomingMessage != null) {
final String source = getAddress(incomingMessage);
final ConnectionMonitor.InfoProvider infoProvider = InfoProviderFactory.forExternalMessage(incomingMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ static InboundMappingProcessor of(final Connection connection,
@Override
List<MappingOutcome<MappedInboundExternalMessage>> process(final ExternalMessage message) {
final var mappers = getMappers(message.getPayloadMapping().orElse(null));
logger.withCorrelationId(message.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()))
logger.withCorrelationId(message.getHeaders())
.debug("Mappers resolved for message: {}", mappers);
final var mappingTimer = MappingTimer.inbound(connectionId, connectionType, message.getHeaders());
return mappingTimer.overall(() -> mappers.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
Expand Down Expand Up @@ -169,14 +168,12 @@ private InboundMappingOutcomes mapInboundMessage(final ExternalMessageWithSender
final InboundMappingProcessor inboundMappingProcessor) {

final var externalMessage = withSender.externalMessage();
@Nullable final var correlationId =
externalMessage.findHeaderIgnoreCase(DittoHeaderDefinition.CORRELATION_ID.getKey()).orElse(null);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(externalMessage.getHeaders())
.debug("Handling ExternalMessage: {}", externalMessage);
try {
return mapExternalMessageToSignal(withSender, inboundMappingProcessor);
} catch (final Exception e) {
logger.withCorrelationId(correlationId)
logger.withCorrelationId(externalMessage.getHeaders())
.error("Handling exception when mapping external message: {}", e.getMessage());
return InboundMappingOutcomes.of(withSender.externalMessage(), e, withSender.sender());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
Expand All @@ -29,7 +30,6 @@
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
Expand Down Expand Up @@ -217,10 +217,9 @@ private void updateSendMonitor(@Nullable final Exception exception) {
dittoRuntimeException.getClass().getSimpleName(), dittoRuntimeException.getMessage());
} else {
publishedMonitor.exception(message, exception);
final DittoHeaders internalHeaders = message.getInternalHeaders();
@Nullable final String correlationId = internalHeaders.getCorrelationId()
.orElseGet(() -> message.findHeaderIgnoreCase(CORRELATION_ID.getKey()).orElse(null));
logger.withCorrelationId(correlationId)
final Map<String, String> combinedHeaders = new HashMap<>(message.getHeaders());
combinedHeaders.putAll(message.getInternalHeaders());
logger.withCorrelationId(combinedHeaders)
.info("Unexpected failure when publishing signal - {}: {}",
exception.getClass().getSimpleName(), exception.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ private void handleJmsMessage(final JmsMessage message) {
.start();
headers = startedSpan.propagateContext(headers);
final ExternalMessageBuilder builder = ExternalMessageFactory.newExternalMessageBuilder(headers);
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder, correlationId)
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder)
.withAuthorizationContext(source.getAuthorizationContext())
.withEnforcement(headerEnforcementFilterFactory.getFilter(headers))
.withHeaderMapping(source.getHeaderMapping())
Expand All @@ -374,10 +374,10 @@ private void handleJmsMessage(final JmsMessage message) {
.build();
inboundMonitor.success(externalMessage);
final Map<String, String> externalMessageHeaders = externalMessage.getHeaders();
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headers)
.info("Received message from AMQP 1.0 with externalMessageHeaders: {}", externalMessageHeaders);
if (logger.isDebugEnabled()) {
logger.withCorrelationId(correlationId).debug("Received message from AMQP 1.0 with payload: {}",
logger.withCorrelationId(headers).debug("Received message from AMQP 1.0 with payload: {}",
externalMessage.getTextPayload().orElse("binary"));
}
forwardToMapping(externalMessage,
Expand All @@ -404,7 +404,7 @@ private void handleJmsMessage(final JmsMessage message) {
inboundMonitor.exception(e);
}
startedSpan.tagAsFailed(e);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headers)
.error(e, "Unexpected {}: {}", e.getClass().getName(), e.getMessage());
} finally {
startedSpan.finish();
Expand All @@ -422,8 +422,6 @@ private void handleJmsMessage(final JmsMessage message) {
private void acknowledge(final JmsMessage message, final boolean isSuccess, final boolean redeliver,
final Map<String, String> externalMessageHeaders) {

final Optional<String> correlationId = Optional.ofNullable(
externalMessageHeaders.get(DittoHeaderDefinition.CORRELATION_ID.getKey()));
try {
final String messageId = message.getJMSMessageID();
recordAckForRateLimit(messageId, isSuccess, redeliver);
Expand All @@ -437,8 +435,7 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
ackType = redeliver ? MODIFIED_FAILED : REJECTED;
ackTypeName = redeliver ? "modified[delivery-failed]" : "rejected";
}
final String jmsCorrelationID = message.getJMSCorrelationID();
logger.withCorrelationId(correlationId.orElse(jmsCorrelationID))
logger.withCorrelationId(externalMessageHeaders)
.info(MessageFormat.format(
"Acking <{0}> with original external message headers=<{1}>, isSuccess=<{2}>, ackType=<{3} {4}>",
messageId,
Expand All @@ -457,15 +454,15 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
"Sending negative acknowledgement: <{0}>", ackTypeName);
}
} catch (final IllegalStateException e) {
logger.withCorrelationId(correlationId.orElse(null))
logger.withCorrelationId(externalMessageHeaders)
.warning(e, "Failed to ack an AMQP message because of server side issues");
} catch (final Exception e) {
logger.withCorrelationId(correlationId.orElse(null)).error(e, "Failed to ack an AMQP message");
logger.withCorrelationId(externalMessageHeaders).error(e, "Failed to ack an AMQP message");
}
}

private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage message,
final ExternalMessageBuilder builder, @Nullable final String correlationId) throws JMSException {
final ExternalMessageBuilder builder) throws JMSException {
if (message instanceof TextMessage textMessage) {
final String payload = textMessage.getText();
if (payload == null) {
Expand All @@ -487,7 +484,7 @@ private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage messag
if (logger.isDebugEnabled()) {
final Destination destination = message.getJMSDestination();
final Map<String, String> headersMapFromJmsMessage = extractHeadersMapFromJmsMessage(message);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headersMapFromJmsMessage)
.debug("Received message at '{}' of unsupported type ({}) with headers: {}",
destination, message.getClass().getName(), headersMapFromJmsMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
try {
final String key = consumerRecord.key();
final ByteBuffer value = consumerRecord.value();
final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(correlationId);
final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(messageHeaders);
correlationIdScopedLogger.debug(
"Transforming incoming kafka message <{}> with headers <{}> and key <{}>.",
value, messageHeaders, key
Expand Down Expand Up @@ -148,7 +148,7 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
return TransformationResult.failed(e.setDittoHeaders(DittoHeaders.of(messageHeaders)));
} catch (final Exception e) {
inboundMonitor.exception(messageHeaders, e);
LOGGER.withCorrelationId(correlationId)
LOGGER.withCorrelationId(messageHeaders)
.error(String.format("Unexpected {%s}: {%s}", e.getClass().getName(), e.getMessage()), e);
startedSpan.tagAsFailed(e);
return null; // Drop message
Expand Down
4 changes: 2 additions & 2 deletions connectivity/service/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>

<appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
<target>System.err</target>
<encoder>
<pattern>%date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{akkaSource} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent}] %logger{20} %X{akkaSource} - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
Expand Down
2 changes: 1 addition & 1 deletion deployment/helm/ditto/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ description: |
A digital twin is a virtual, cloud based, representation of his real world counterpart
(real world “Things”, e.g. devices like sensors, smart heating, connected cars, smart grids, EV charging stations etc).
type: application
version: 3.3.9 # chart version is effectively set by release-job
version: 3.3.10 # chart version is effectively set by release-job
appVersion: 3.3.6
keywords:
- iot-chart
Expand Down
1 change: 0 additions & 1 deletion deployment/helm/ditto/logback-config/connectivity.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
<excludeMdcKeyName>sourceActorSystem</excludeMdcKeyName>
<excludeMdcKeyName>akkaUid</excludeMdcKeyName>
<excludeMdcKeyName>akkaTimestamp</excludeMdcKeyName>
<mdcKeyFieldName>x-correlation-id=correlation-id</mdcKeyFieldName>
<mdcKeyFieldName>connection-id=ditto-connection-id</mdcKeyFieldName>
<mdcKeyFieldName>connection-type=ditto-connection-type</mdcKeyFieldName>
</encoder>
Expand Down
1 change: 0 additions & 1 deletion deployment/helm/ditto/logback-config/gateway.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
<excludeMdcKeyName>sourceActorSystem</excludeMdcKeyName>
<excludeMdcKeyName>akkaUid</excludeMdcKeyName>
<excludeMdcKeyName>akkaTimestamp</excludeMdcKeyName>
<mdcKeyFieldName>x-correlation-id=correlation-id</mdcKeyFieldName>
</encoder>
</appender>

Expand Down
1 change: 0 additions & 1 deletion deployment/helm/ditto/logback-config/policies.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
<excludeMdcKeyName>sourceActorSystem</excludeMdcKeyName>
<excludeMdcKeyName>akkaUid</excludeMdcKeyName>
<excludeMdcKeyName>akkaTimestamp</excludeMdcKeyName>
<mdcKeyFieldName>x-correlation-id=correlation-id</mdcKeyFieldName>
</encoder>
</appender>

Expand Down
1 change: 0 additions & 1 deletion deployment/helm/ditto/logback-config/things.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
<excludeMdcKeyName>sourceActorSystem</excludeMdcKeyName>
<excludeMdcKeyName>akkaUid</excludeMdcKeyName>
<excludeMdcKeyName>akkaTimestamp</excludeMdcKeyName>
<mdcKeyFieldName>x-correlation-id=correlation-id</mdcKeyFieldName>
</encoder>
</appender>

Expand Down
1 change: 0 additions & 1 deletion deployment/helm/ditto/logback-config/thingssearch.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
<excludeMdcKeyName>sourceActorSystem</excludeMdcKeyName>
<excludeMdcKeyName>akkaUid</excludeMdcKeyName>
<excludeMdcKeyName>akkaTimestamp</excludeMdcKeyName>
<mdcKeyFieldName>x-correlation-id=correlation-id</mdcKeyFieldName>
</encoder>
</appender>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private AcknowledgementAggregatorActor(final EntityId entityId,
final var acknowledgementRequests = signalDittoHeaders.getAcknowledgementRequests();
ackregator = AcknowledgementAggregator.getInstance(entityId, correlationId, timeout, headerTranslator);
ackregator.addAcknowledgementRequests(acknowledgementRequests);
log.withCorrelationId(correlationId)
log.withCorrelationId(signalDittoHeaders)
.info("Starting to wait for all requested acknowledgements <{}> for a maximum duration of <{}>.",
acknowledgementRequests, timeout);

Expand Down
2 changes: 1 addition & 1 deletion edge/service/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%-5level] [%X{x-correlation-id}] %logger{15} - %msg%n%rEx</pattern>
<pattern>%d{HH:mm:ss.SSS} [%-5level] [%X{correlation-id}] %logger{15} - %msg%n%rEx</pattern>
</encoder>
</appender>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ public void serviceRequestsDone(final Control serviceRequestsDone) {

private void handleCommand(final Command<?> command) {
try {
logger.setCorrelationId(command);
receivedCommand = command;
setDefaultTimeoutExceptionSupplier(command);
final var timeoutOverride = getReceiveTimeout(command, commandConfig);
Expand Down Expand Up @@ -528,8 +527,8 @@ private void handleReceiveTimeout() {
final var actorContext = getContext();
final var receiveTimeout = actorContext.getReceiveTimeout();

logger.setCorrelationId(WithDittoHeaders.getCorrelationId(receivedCommand).orElse(null));
logger.info("Got <{}> after <{}> before an appropriate response arrived.",
logger.withCorrelationId(receivedCommand)
.info("Got <{}> after <{}> before an appropriate response arrived.",
ReceiveTimeout.class.getSimpleName(), receiveTimeout);

if (null != timeoutExceptionSupplier) {
Expand All @@ -539,8 +538,9 @@ private void handleReceiveTimeout() {
} else {
actorContext.cancelReceiveTimeout();
// This case is a programming error that should not happen at all.
logger.error("Actor does not have a timeout exception supplier." +
" Thus, no DittoRuntimeException could be handled.");
logger.withCorrelationId(receivedCommand)
.error("Actor does not have a timeout exception supplier. " +
"Thus, no DittoRuntimeException could be handled.");
stop();
}
}
Expand Down
Loading

0 comments on commit 3770ebc

Please sign in to comment.