Skip to content

Commit

Permalink
Merge pull request #1740 from eclipse-ditto/feature/log-tracing-context
Browse files Browse the repository at this point in the history
#1739 provide the traceparent header as MDC value in logs
  • Loading branch information
thjaeckle authored Sep 19, 2023
2 parents d0fa186 + 62df02f commit e5f32fd
Show file tree
Hide file tree
Showing 54 changed files with 460 additions and 293 deletions.
5 changes: 5 additions & 0 deletions base/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<scope>provided</scope> <!-- provided by actual service -->
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor_${scala.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,7 @@ private DevOpsCommandResponseCorrelationActor(final ActorRef devOpsCommandSender
final var dittoHeaders = devOpsCommand.getDittoHeaders();
aggregateResults = isAggregateResults(dittoHeaders);
this.expectedResponses = expectedResponses;
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
logger.setCorrelationId(dittoHeaders);
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this).withCorrelationId(dittoHeaders);
}

private static boolean isAggregateResults(final DittoHeaders dittoHeaders) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.base.service.logging;

import java.io.IOException;

import com.fasterxml.jackson.core.JsonGenerator;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import net.logstash.logback.composite.AbstractFieldJsonProvider;
import net.logstash.logback.composite.JsonWritingUtils;

/**
* Logstash logback provider for providing a field {@code intLevel} for each log entry with the values, depending on
* the log level, being:
* <ul>
* <li>TRACE: 1</li>
* <li>DEBUG: 2</li>
* <li>INFO: 3</li>
* <li>WARN: 4</li>
* <li>ERROR: 5</li>
* </ul>
* To be used in {@code logback.xml} as:
* <pre>{@code
* <encoder class="net.logstash.logback.encoder.LogstashEncoder">
* <provider class="org.eclipse.ditto.base.service.logging.IntLevelJsonProvider"/>
* </encoder>
* }
* </pre>
*
* @since 3.4.0
*/
public final class IntLevelJsonProvider extends AbstractFieldJsonProvider<ILoggingEvent> {

@Override
public void writeTo(final JsonGenerator generator, final ILoggingEvent event) throws IOException {
JsonWritingUtils.writeNumberField(
generator,
"intLevel",
mapFromLevelToIntLevel(event.getLevel())
);
}

private int mapFromLevelToIntLevel(final Level level) {
if (level.equals(Level.OFF)) {
return 0;
} else if (level.equals(Level.TRACE)) {
return 1;
} else if (level.equals(Level.DEBUG)) {
return 2;
} else if (level.equals(Level.INFO)) {
return 3;
} else if (level.equals(Level.WARN)) {
return 4;
} else if (level.equals(Level.ERROR)) {
return 5;
} else if (level.equals(Level.ALL)) {
// should not be able to happen for a single log entry:
return Integer.MAX_VALUE;
} else {
// should not be able to happen at all:
return Integer.MIN_VALUE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -314,11 +313,10 @@ private Stream<SendingOrDropped> sendMappedOutboundSignal(final OutboundSignal.M
final int maxPayloadBytesForSignal) {

final var message = outbound.getExternalMessage();
final String correlationId = message.getHeaders().get(CORRELATION_ID.getKey());
final Signal<?> outboundSource = outbound.getSource();
final List<Target> outboundTargets = outbound.getTargets();

final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(correlationId);
final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(message.getHeaders());
final Optional<SendingContext> replyTargetSendingContext = getSendingContext(outbound);

final List<SendingContext> sendingContexts = replyTargetSendingContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,8 @@ public Optional<Signal<?>> onMapped(final String mapperId,

@Override
public Optional<Signal<?>> onDropped(final String mapperId, @Nullable final ExternalMessage incomingMessage) {
logger.withCorrelationId(Optional.ofNullable(incomingMessage)
.map(ExternalMessage::getHeaders)
.map(h -> h.get(DittoHeaderDefinition.CORRELATION_ID.getKey()))
.orElse(null)
).debug("Message mapping returned null, message is dropped.");
logger.withCorrelationId(incomingMessage != null ? incomingMessage.getHeaders() : null)
.debug("Message mapping returned null, message is dropped.");
if (incomingMessage != null) {
final String source = getAddress(incomingMessage);
final ConnectionMonitor.InfoProvider infoProvider = InfoProviderFactory.forExternalMessage(incomingMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ static InboundMappingProcessor of(final Connection connection,
@Override
List<MappingOutcome<MappedInboundExternalMessage>> process(final ExternalMessage message) {
final var mappers = getMappers(message.getPayloadMapping().orElse(null));
logger.withCorrelationId(message.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()))
logger.withCorrelationId(message.getHeaders())
.debug("Mappers resolved for message: {}", mappers);
final var mappingTimer = MappingTimer.inbound(connectionId, connectionType, message.getHeaders());
return mappingTimer.overall(() -> mappers.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
Expand Down Expand Up @@ -169,14 +168,12 @@ private InboundMappingOutcomes mapInboundMessage(final ExternalMessageWithSender
final InboundMappingProcessor inboundMappingProcessor) {

final var externalMessage = withSender.externalMessage();
@Nullable final var correlationId =
externalMessage.findHeaderIgnoreCase(DittoHeaderDefinition.CORRELATION_ID.getKey()).orElse(null);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(externalMessage.getHeaders())
.debug("Handling ExternalMessage: {}", externalMessage);
try {
return mapExternalMessageToSignal(withSender, inboundMappingProcessor);
} catch (final Exception e) {
logger.withCorrelationId(correlationId)
logger.withCorrelationId(externalMessage.getHeaders())
.error("Handling exception when mapping external message: {}", e.getMessage());
return InboundMappingOutcomes.of(withSender.externalMessage(), e, withSender.sender());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
Expand All @@ -29,7 +30,6 @@
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
Expand Down Expand Up @@ -217,10 +217,9 @@ private void updateSendMonitor(@Nullable final Exception exception) {
dittoRuntimeException.getClass().getSimpleName(), dittoRuntimeException.getMessage());
} else {
publishedMonitor.exception(message, exception);
final DittoHeaders internalHeaders = message.getInternalHeaders();
@Nullable final String correlationId = internalHeaders.getCorrelationId()
.orElseGet(() -> message.findHeaderIgnoreCase(CORRELATION_ID.getKey()).orElse(null));
logger.withCorrelationId(correlationId)
final Map<String, String> combinedHeaders = new HashMap<>(message.getHeaders());
combinedHeaders.putAll(message.getInternalHeaders());
logger.withCorrelationId(combinedHeaders)
.info("Unexpected failure when publishing signal - {}: {}",
exception.getClass().getSimpleName(), exception.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ private void handleJmsMessage(final JmsMessage message) {
.start();
headers = startedSpan.propagateContext(headers);
final ExternalMessageBuilder builder = ExternalMessageFactory.newExternalMessageBuilder(headers);
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder, correlationId)
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder)
.withAuthorizationContext(source.getAuthorizationContext())
.withEnforcement(headerEnforcementFilterFactory.getFilter(headers))
.withHeaderMapping(source.getHeaderMapping())
Expand All @@ -374,10 +374,10 @@ private void handleJmsMessage(final JmsMessage message) {
.build();
inboundMonitor.success(externalMessage);
final Map<String, String> externalMessageHeaders = externalMessage.getHeaders();
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headers)
.info("Received message from AMQP 1.0 with externalMessageHeaders: {}", externalMessageHeaders);
if (logger.isDebugEnabled()) {
logger.withCorrelationId(correlationId).debug("Received message from AMQP 1.0 with payload: {}",
logger.withCorrelationId(headers).debug("Received message from AMQP 1.0 with payload: {}",
externalMessage.getTextPayload().orElse("binary"));
}
forwardToMapping(externalMessage,
Expand All @@ -404,7 +404,7 @@ private void handleJmsMessage(final JmsMessage message) {
inboundMonitor.exception(e);
}
startedSpan.tagAsFailed(e);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headers)
.error(e, "Unexpected {}: {}", e.getClass().getName(), e.getMessage());
} finally {
startedSpan.finish();
Expand All @@ -422,8 +422,6 @@ private void handleJmsMessage(final JmsMessage message) {
private void acknowledge(final JmsMessage message, final boolean isSuccess, final boolean redeliver,
final Map<String, String> externalMessageHeaders) {

final Optional<String> correlationId = Optional.ofNullable(
externalMessageHeaders.get(DittoHeaderDefinition.CORRELATION_ID.getKey()));
try {
final String messageId = message.getJMSMessageID();
recordAckForRateLimit(messageId, isSuccess, redeliver);
Expand All @@ -437,8 +435,7 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
ackType = redeliver ? MODIFIED_FAILED : REJECTED;
ackTypeName = redeliver ? "modified[delivery-failed]" : "rejected";
}
final String jmsCorrelationID = message.getJMSCorrelationID();
logger.withCorrelationId(correlationId.orElse(jmsCorrelationID))
logger.withCorrelationId(externalMessageHeaders)
.info(MessageFormat.format(
"Acking <{0}> with original external message headers=<{1}>, isSuccess=<{2}>, ackType=<{3} {4}>",
messageId,
Expand All @@ -457,15 +454,15 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
"Sending negative acknowledgement: <{0}>", ackTypeName);
}
} catch (final IllegalStateException e) {
logger.withCorrelationId(correlationId.orElse(null))
logger.withCorrelationId(externalMessageHeaders)
.warning(e, "Failed to ack an AMQP message because of server side issues");
} catch (final Exception e) {
logger.withCorrelationId(correlationId.orElse(null)).error(e, "Failed to ack an AMQP message");
logger.withCorrelationId(externalMessageHeaders).error(e, "Failed to ack an AMQP message");
}
}

private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage message,
final ExternalMessageBuilder builder, @Nullable final String correlationId) throws JMSException {
final ExternalMessageBuilder builder) throws JMSException {
if (message instanceof TextMessage textMessage) {
final String payload = textMessage.getText();
if (payload == null) {
Expand All @@ -487,7 +484,7 @@ private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage messag
if (logger.isDebugEnabled()) {
final Destination destination = message.getJMSDestination();
final Map<String, String> headersMapFromJmsMessage = extractHeadersMapFromJmsMessage(message);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headersMapFromJmsMessage)
.debug("Received message at '{}' of unsupported type ({}) with headers: {}",
destination, message.getClass().getName(), headersMapFromJmsMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
try {
final String key = consumerRecord.key();
final ByteBuffer value = consumerRecord.value();
final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(correlationId);
final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(messageHeaders);
correlationIdScopedLogger.debug(
"Transforming incoming kafka message <{}> with headers <{}> and key <{}>.",
value, messageHeaders, key
Expand Down Expand Up @@ -148,7 +148,7 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
return TransformationResult.failed(e.setDittoHeaders(DittoHeaders.of(messageHeaders)));
} catch (final Exception e) {
inboundMonitor.exception(messageHeaders, e);
LOGGER.withCorrelationId(correlationId)
LOGGER.withCorrelationId(messageHeaders)
.error(String.format("Unexpected {%s}: {%s}", e.getClass().getName(), e.getMessage()), e);
startedSpan.tagAsFailed(e);
return null; // Drop message
Expand Down
4 changes: 2 additions & 2 deletions connectivity/service/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent-trace-id}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
</encoder>
</appender>

<appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
<target>System.err</target>
<encoder>
<pattern>%date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent-trace-id}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ protected Sink<Object, NotUsed> setupInboundMappingSink(final ActorRef clientAct
.thenReturn(logger);
when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class)))
.thenReturn(logger);
when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger);
final ProtocolAdapter protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null);
final var connection = CONNECTION.toBuilder().payloadMappingDefinition(payloadMappingDefinition).build();
final InboundMappingProcessor inboundMappingProcessor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ ActorRef createInboundMappingProcessorActor(final ActorRef proxyActor,
final var logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
Mockito.when(logger.withCorrelationId(Mockito.any(DittoHeaders.class)))
.thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.nullable(CharSequence.class)))
.thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ private static InboundMappingProcessor createThrowingProcessor() {
final ThreadSafeDittoLoggingAdapter logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<CharSequence>any());
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<DittoHeaders>any());
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<Map>any());
final Connection connection =
TestConstants.createConnection(ConnectionId.of("connectionId"), ConnectionType.MQTT);
return InboundMappingProcessor.of(connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static void setUp() {
when(logger.withCorrelationId(Mockito.nullable(String.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(WithDittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(DittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(Map.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger);

protocolAdapterProvider = new DittoProtocolAdapterProvider(Mockito.mock(ProtocolConfig.class));
Expand Down
Loading

0 comments on commit e5f32fd

Please sign in to comment.