Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1739 provide the traceparent header as MDC value in logs #1740

Merged
merged 5 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions base/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<scope>provided</scope> <!-- provided by actual service -->
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor_${scala.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,7 @@ private DevOpsCommandResponseCorrelationActor(final ActorRef devOpsCommandSender
final var dittoHeaders = devOpsCommand.getDittoHeaders();
aggregateResults = isAggregateResults(dittoHeaders);
this.expectedResponses = expectedResponses;
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
logger.setCorrelationId(dittoHeaders);
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this).withCorrelationId(dittoHeaders);
}

private static boolean isAggregateResults(final DittoHeaders dittoHeaders) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.base.service.logging;

import java.io.IOException;

import com.fasterxml.jackson.core.JsonGenerator;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import net.logstash.logback.composite.AbstractFieldJsonProvider;
import net.logstash.logback.composite.JsonWritingUtils;

/**
* Logstash logback provider for providing a field {@code intLevel} for each log entry with the values, depending on
* the log level, being:
* <ul>
* <li>TRACE: 1</li>
* <li>DEBUG: 2</li>
* <li>INFO: 3</li>
* <li>WARN: 4</li>
* <li>ERROR: 5</li>
* </ul>
* To be used in {@code logback.xml} as:
* <pre>{@code
* <encoder class="net.logstash.logback.encoder.LogstashEncoder">
* <provider class="org.eclipse.ditto.base.service.logging.IntLevelJsonProvider"/>
* </encoder>
* }
* </pre>
*
* @since 3.4.0
*/
public final class IntLevelJsonProvider extends AbstractFieldJsonProvider<ILoggingEvent> {

@Override
public void writeTo(final JsonGenerator generator, final ILoggingEvent event) throws IOException {
JsonWritingUtils.writeNumberField(
generator,
"intLevel",
mapFromLevelToIntLevel(event.getLevel())
);
}

private int mapFromLevelToIntLevel(final Level level) {
if (level.equals(Level.OFF)) {
return 0;
} else if (level.equals(Level.TRACE)) {
return 1;
} else if (level.equals(Level.DEBUG)) {
return 2;
} else if (level.equals(Level.INFO)) {
return 3;
} else if (level.equals(Level.WARN)) {
return 4;
} else if (level.equals(Level.ERROR)) {
return 5;
} else if (level.equals(Level.ALL)) {
// should not be able to happen for a single log entry:
return Integer.MAX_VALUE;
} else {
// should not be able to happen at all:
return Integer.MIN_VALUE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -314,11 +313,10 @@ private Stream<SendingOrDropped> sendMappedOutboundSignal(final OutboundSignal.M
final int maxPayloadBytesForSignal) {

final var message = outbound.getExternalMessage();
final String correlationId = message.getHeaders().get(CORRELATION_ID.getKey());
final Signal<?> outboundSource = outbound.getSource();
final List<Target> outboundTargets = outbound.getTargets();

final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(correlationId);
final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(message.getHeaders());
final Optional<SendingContext> replyTargetSendingContext = getSendingContext(outbound);

final List<SendingContext> sendingContexts = replyTargetSendingContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,8 @@ public Optional<Signal<?>> onMapped(final String mapperId,

@Override
public Optional<Signal<?>> onDropped(final String mapperId, @Nullable final ExternalMessage incomingMessage) {
logger.withCorrelationId(Optional.ofNullable(incomingMessage)
.map(ExternalMessage::getHeaders)
.map(h -> h.get(DittoHeaderDefinition.CORRELATION_ID.getKey()))
.orElse(null)
).debug("Message mapping returned null, message is dropped.");
logger.withCorrelationId(incomingMessage != null ? incomingMessage.getHeaders() : null)
.debug("Message mapping returned null, message is dropped.");
if (incomingMessage != null) {
final String source = getAddress(incomingMessage);
final ConnectionMonitor.InfoProvider infoProvider = InfoProviderFactory.forExternalMessage(incomingMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ static InboundMappingProcessor of(final Connection connection,
@Override
List<MappingOutcome<MappedInboundExternalMessage>> process(final ExternalMessage message) {
final var mappers = getMappers(message.getPayloadMapping().orElse(null));
logger.withCorrelationId(message.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey()))
logger.withCorrelationId(message.getHeaders())
.debug("Mappers resolved for message: {}", mappers);
final var mappingTimer = MappingTimer.inbound(connectionId, connectionType, message.getHeaders());
return mappingTimer.overall(() -> mappers.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
Expand Down Expand Up @@ -169,14 +168,12 @@ private InboundMappingOutcomes mapInboundMessage(final ExternalMessageWithSender
final InboundMappingProcessor inboundMappingProcessor) {

final var externalMessage = withSender.externalMessage();
@Nullable final var correlationId =
externalMessage.findHeaderIgnoreCase(DittoHeaderDefinition.CORRELATION_ID.getKey()).orElse(null);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(externalMessage.getHeaders())
.debug("Handling ExternalMessage: {}", externalMessage);
try {
return mapExternalMessageToSignal(withSender, inboundMappingProcessor);
} catch (final Exception e) {
logger.withCorrelationId(correlationId)
logger.withCorrelationId(externalMessage.getHeaders())
.error("Handling exception when mapping external message: {}", e.getMessage());
return InboundMappingOutcomes.of(withSender.externalMessage(), e, withSender.sender());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.base.model.headers.DittoHeaderDefinition.CORRELATION_ID;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
Expand All @@ -29,7 +30,6 @@
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
Expand Down Expand Up @@ -217,10 +217,9 @@ private void updateSendMonitor(@Nullable final Exception exception) {
dittoRuntimeException.getClass().getSimpleName(), dittoRuntimeException.getMessage());
} else {
publishedMonitor.exception(message, exception);
final DittoHeaders internalHeaders = message.getInternalHeaders();
@Nullable final String correlationId = internalHeaders.getCorrelationId()
.orElseGet(() -> message.findHeaderIgnoreCase(CORRELATION_ID.getKey()).orElse(null));
logger.withCorrelationId(correlationId)
final Map<String, String> combinedHeaders = new HashMap<>(message.getHeaders());
combinedHeaders.putAll(message.getInternalHeaders());
logger.withCorrelationId(combinedHeaders)
.info("Unexpected failure when publishing signal - {}: {}",
exception.getClass().getSimpleName(), exception.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ private void handleJmsMessage(final JmsMessage message) {
.start();
headers = startedSpan.propagateContext(headers);
final ExternalMessageBuilder builder = ExternalMessageFactory.newExternalMessageBuilder(headers);
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder, correlationId)
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder)
.withAuthorizationContext(source.getAuthorizationContext())
.withEnforcement(headerEnforcementFilterFactory.getFilter(headers))
.withHeaderMapping(source.getHeaderMapping())
Expand All @@ -374,10 +374,10 @@ private void handleJmsMessage(final JmsMessage message) {
.build();
inboundMonitor.success(externalMessage);
final Map<String, String> externalMessageHeaders = externalMessage.getHeaders();
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headers)
.info("Received message from AMQP 1.0 with externalMessageHeaders: {}", externalMessageHeaders);
if (logger.isDebugEnabled()) {
logger.withCorrelationId(correlationId).debug("Received message from AMQP 1.0 with payload: {}",
logger.withCorrelationId(headers).debug("Received message from AMQP 1.0 with payload: {}",
externalMessage.getTextPayload().orElse("binary"));
}
forwardToMapping(externalMessage,
Expand All @@ -404,7 +404,7 @@ private void handleJmsMessage(final JmsMessage message) {
inboundMonitor.exception(e);
}
startedSpan.tagAsFailed(e);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headers)
.error(e, "Unexpected {}: {}", e.getClass().getName(), e.getMessage());
} finally {
startedSpan.finish();
Expand All @@ -422,8 +422,6 @@ private void handleJmsMessage(final JmsMessage message) {
private void acknowledge(final JmsMessage message, final boolean isSuccess, final boolean redeliver,
final Map<String, String> externalMessageHeaders) {

final Optional<String> correlationId = Optional.ofNullable(
externalMessageHeaders.get(DittoHeaderDefinition.CORRELATION_ID.getKey()));
try {
final String messageId = message.getJMSMessageID();
recordAckForRateLimit(messageId, isSuccess, redeliver);
Expand All @@ -437,8 +435,7 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
ackType = redeliver ? MODIFIED_FAILED : REJECTED;
ackTypeName = redeliver ? "modified[delivery-failed]" : "rejected";
}
final String jmsCorrelationID = message.getJMSCorrelationID();
logger.withCorrelationId(correlationId.orElse(jmsCorrelationID))
logger.withCorrelationId(externalMessageHeaders)
.info(MessageFormat.format(
"Acking <{0}> with original external message headers=<{1}>, isSuccess=<{2}>, ackType=<{3} {4}>",
messageId,
Expand All @@ -457,15 +454,15 @@ private void acknowledge(final JmsMessage message, final boolean isSuccess, fina
"Sending negative acknowledgement: <{0}>", ackTypeName);
}
} catch (final IllegalStateException e) {
logger.withCorrelationId(correlationId.orElse(null))
logger.withCorrelationId(externalMessageHeaders)
.warning(e, "Failed to ack an AMQP message because of server side issues");
} catch (final Exception e) {
logger.withCorrelationId(correlationId.orElse(null)).error(e, "Failed to ack an AMQP message");
logger.withCorrelationId(externalMessageHeaders).error(e, "Failed to ack an AMQP message");
}
}

private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage message,
final ExternalMessageBuilder builder, @Nullable final String correlationId) throws JMSException {
final ExternalMessageBuilder builder) throws JMSException {
if (message instanceof TextMessage textMessage) {
final String payload = textMessage.getText();
if (payload == null) {
Expand All @@ -487,7 +484,7 @@ private ExternalMessageBuilder extractPayloadFromMessage(final JmsMessage messag
if (logger.isDebugEnabled()) {
final Destination destination = message.getJMSDestination();
final Map<String, String> headersMapFromJmsMessage = extractHeadersMapFromJmsMessage(message);
logger.withCorrelationId(correlationId)
logger.withCorrelationId(headersMapFromJmsMessage)
.debug("Received message at '{}' of unsupported type ({}) with headers: {}",
destination, message.getClass().getName(), headersMapFromJmsMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
try {
final String key = consumerRecord.key();
final ByteBuffer value = consumerRecord.value();
final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(correlationId);
final ThreadSafeDittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(messageHeaders);
correlationIdScopedLogger.debug(
"Transforming incoming kafka message <{}> with headers <{}> and key <{}>.",
value, messageHeaders, key
Expand Down Expand Up @@ -148,7 +148,7 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
return TransformationResult.failed(e.setDittoHeaders(DittoHeaders.of(messageHeaders)));
} catch (final Exception e) {
inboundMonitor.exception(messageHeaders, e);
LOGGER.withCorrelationId(correlationId)
LOGGER.withCorrelationId(messageHeaders)
.error(String.format("Unexpected {%s}: {%s}", e.getClass().getName(), e.getMessage()), e);
startedSpan.tagAsFailed(e);
return null; // Drop message
Expand Down
4 changes: 2 additions & 2 deletions connectivity/service/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent-trace-id}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
</encoder>
</appender>

<appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
<target>System.err</target>
<encoder>
<pattern>%date{ISO8601} %-5level [%X{x-correlation-id}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
<pattern>%date{ISO8601} %-5level [%X{correlation-id}][%X{traceparent-trace-id}] %logger{20} %X{pekkoSource} - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ protected Sink<Object, NotUsed> setupInboundMappingSink(final ActorRef clientAct
.thenReturn(logger);
when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class)))
.thenReturn(logger);
when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger);
final ProtocolAdapter protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null);
final var connection = CONNECTION.toBuilder().payloadMappingDefinition(payloadMappingDefinition).build();
final InboundMappingProcessor inboundMappingProcessor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ ActorRef createInboundMappingProcessorActor(final ActorRef proxyActor,
final var logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
Mockito.when(logger.withCorrelationId(Mockito.any(DittoHeaders.class)))
.thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(Map.class))).thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.nullable(CharSequence.class)))
.thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ private static InboundMappingProcessor createThrowingProcessor() {
final ThreadSafeDittoLoggingAdapter logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<CharSequence>any());
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<DittoHeaders>any());
Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.<Map>any());
final Connection connection =
TestConstants.createConnection(ConnectionId.of("connectionId"), ConnectionType.MQTT);
return InboundMappingProcessor.of(connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static void setUp() {
when(logger.withCorrelationId(Mockito.nullable(String.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(WithDittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(DittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(Map.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger);

protocolAdapterProvider = new DittoProtocolAdapterProvider(Mockito.mock(ProtocolConfig.class));
Expand Down
Loading
Loading