Skip to content

Commit

Permalink
Merge pull request #2105 from beyonnex-io/bugfix/tracing-propagation
Browse files Browse the repository at this point in the history
fix tracing propagation in general
  • Loading branch information
thjaeckle authored Jan 30, 2025
2 parents 9322980 + 028904f commit fa2b4c9
Show file tree
Hide file tree
Showing 18 changed files with 32 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,7 @@ private SendingOrDropped publishToGenericTarget(final ExpressionResolver resolve
.tag(SpanTagKey.CONNECTION_TARGET.getTagForValue(publishTarget.toString()))
.start();
final var mappedMessageWithTraceContext =
mappedMessage.withHeaders(startedSpan.propagateContext(
DittoHeaders.newBuilder(mappedMessage.getHeaders())
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
.asCaseSensitiveMap()
));
mappedMessage.withHeaders(startedSpan.propagateContext(mappedMessage.getHeaders()));

final CompletionStage<SendResult> responsesFuture = publishMessage(outboundSource,
autoAckTarget,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,7 @@ private void handleJmsMessage(final JmsMessage message) throws JMSException {
.correlationId(correlationId)
.connectionId(connectionId)
.start();
headers = startedSpan.propagateContext(DittoHeaders.of(headers)
.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
.asCaseSensitiveMap()
);
headers = startedSpan.propagateContext(headers);
final ExternalMessageBuilder builder = ExternalMessageFactory.newExternalMessageBuilder(headers);
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder)
.withAuthorizationContext(source.getAuthorizationContext())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,7 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
).correlationId(correlationId)
.connectionId(connectionId)
.start();
messageHeaders = startedSpan.propagateContext(DittoHeaders.of(messageHeaders)
.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
.asCaseSensitiveMap()
);
messageHeaders = startedSpan.propagateContext(messageHeaders);

try {
final String key = consumerRecord.key();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,7 @@ private void handleDelivery(final Delivery delivery) {
.connectionId(connectionId)
.correlationId(correlationId)
.start();
headers = startedSpan.propagateContext(DittoHeaders.of(headers)
.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
.asCaseSensitiveMap()
);
headers = startedSpan.propagateContext(headers);

final ExternalMessageBuilder externalMessageBuilder =
ExternalMessageFactory.newExternalMessageBuilder(headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.base.model.signals.commands.Command;
Expand Down Expand Up @@ -147,11 +146,7 @@ private void askTargetActor(final Command<?> command, final List<ThingId> thingI
.tag("size", Integer.toString(thingIds.size()))
.start();
final Command<?> tracedCommand = command.setDittoHeaders(
DittoHeaders.of(startedSpan.propagateContext(
dittoHeaders.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
))
DittoHeaders.of(startedSpan.propagateContext(dittoHeaders))
);

final DistributedPubSubMediator.Publish pubSubMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,19 +172,15 @@ private static HttpRequest adjustSpanContextHeadersOfRequest(
) {
final Set<String> headerNames = new HashSet<>();
final Map<String, String> httpHeaders = StreamSupport.stream(originalRequest.getHeaders().spliterator(), false)
.map(httpHeader -> {
.peek(httpHeader -> {
if (!headerNames.add(httpHeader.name())) {
throw GatewayDuplicateHeaderException.newBuilder(httpHeader.name())
.dittoHeaders(DittoHeaders.newBuilder()
.correlationId(correlationId)
.build()
).build();
}
return httpHeader;
})
.filter(httpHeader ->
!DittoHeaderDefinition.W3C_TRACEPARENT.getKey().equals(httpHeader.name())
)
.collect(Collectors.toMap(HttpHeader::name, HttpHeader::value, (dv1, dv2) -> {
throw GatewayDuplicateHeaderException.newBuilder()
.dittoHeaders(DittoHeaders.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ public CompletionStage<JsonObject> retrievePartialThing(final ThingId thingId,

final RetrieveThing command =
RetrieveThing.getBuilder(thingId, DittoHeaders.of(startedSpan.propagateContext(
dittoHeadersBuilder
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
dittoHeadersBuilder.build()
)))
.withSelectedFields(jsonFieldSelector)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,7 @@ private static DittoHeaders buildDittoHeadersNotAddedToCacheKey(final List<Thing
)
.start();

return DittoHeaders.of(startedSpan.propagateContext(
dittoHeadersBuilder
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
));
return DittoHeaders.of(startedSpan.propagateContext(dittoHeadersBuilder.build()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.pekko.serialization.SerializerWithStringManifest;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
Expand Down Expand Up @@ -209,11 +208,7 @@ private static JsonObject getDittoHeadersWithSpanContextAsJson(
final DittoHeaders dittoHeaders,
final StartedSpan startedSpan
) {
final var dittoHeadersWithSpanContext = DittoHeaders.of(startedSpan.propagateContext(
dittoHeaders.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
));
final var dittoHeadersWithSpanContext = DittoHeaders.of(startedSpan.propagateContext(dittoHeaders));
return dittoHeadersWithSpanContext.toJson();
}

Expand Down Expand Up @@ -333,11 +328,7 @@ private Jsonifiable<?> createJsonifiableFrom(
beforeDeserializeInstant
);
final var result =
deserializeJson(payload, manifest, DittoHeaders.of(startedSpan.propagateContext(
dittoHeaders.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
)));
deserializeJson(payload, manifest, DittoHeaders.of(startedSpan.propagateContext(dittoHeaders)));
try {
return result;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,12 +730,7 @@ private <T extends Command<?>> void handleByStrategy(final T command, @Nullable
.start();

final var tracedCommand =
command.setDittoHeaders(DittoHeaders.of(startedSpan.propagateContext(
command.getDittoHeaders()
.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
)));
command.setDittoHeaders(DittoHeaders.of(startedSpan.propagateContext(command.getDittoHeaders())));

accessCounter++;
Result<E> result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,11 +908,7 @@ protected CompletionStage<Object> enforceSignalAndForwardToTargetActor(final S s
.correlationId(signal.getDittoHeaders().getCorrelationId().orElse(null))
.start();
final var tracedSignal = signal.setDittoHeaders(
DittoHeaders.of(startedSpan.propagateContext(
signal.getDittoHeaders().toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
))
DittoHeaders.of(startedSpan.propagateContext(signal.getDittoHeaders()))
);
final StartedTimer rootTimer = createTimer(tracedSignal);
final StartedTimer enforcementTimer = rootTimer.startNewSegment(ENFORCEMENT_TIMER_SEGMENT_ENFORCEMENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public Context getContextFromHeaders(final Map<String, String> headers) {
public Map<String, String> propagateContextToHeaders(final Context context, final Map<String, String> headers) {
checkNotNull(context, "context");
final var result = getMutableCopyOfMap(checkNotNull(headers, "headers"));
propagation.write(context, result::putIfAbsent);
propagation.write(context, result::put); // always put, not only if absent in order to overwrite existing values
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import java.time.Instant;
import java.util.Map;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.internal.utils.metrics.instruments.tag.Tag;
import org.eclipse.ditto.internal.utils.metrics.instruments.tag.TagSet;

Expand Down Expand Up @@ -113,11 +115,17 @@ public SpanOperationName getOperationName() {

@Override
public Map<String, String> propagateContext(final Map<String, String> headers) {
return httpContextPropagation.propagateContextToHeaders(wrapSpanInContext(), headers);
return httpContextPropagation.propagateContextToHeaders(wrapSpanInContext(
headers.get(DittoHeaderDefinition.W3C_TRACESTATE.getKey())
), headers);
}

private Context wrapSpanInContext() {
return Context.of(Span.Key(), span);
private Context wrapSpanInContext(@Nullable final String traceStateHeader) {
if (traceStateHeader != null) {
return Context.of(Span.Key(), span, Context.key("tracestate", ""), traceStateHeader);
} else {
return Context.of(Span.Key(), span);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
Expand Down Expand Up @@ -138,11 +137,7 @@ private void doEnforceSignal(final S signal, final ActorRef sender) {
.start();
final Optional<String> formerTraceParent = dittoHeaders.getTraceParent();
final var tracedSignal = signal.setDittoHeaders(
DittoHeaders.of(startedSpan.propagateContext(
dittoHeaders.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
))
DittoHeaders.of(startedSpan.propagateContext(dittoHeaders))
);
final ActorRef self = getSelf();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.policies.model.PolicyId;

/**
Expand Down Expand Up @@ -169,7 +169,7 @@ public FromScratch setFeature(final String featureId) {

@Override
public FromScratch setFeature(final String featureId, @Nullable final FeatureDefinition featureDefinition,
final FeatureProperties featureProperties) {
@Nullable final FeatureProperties featureProperties) {

return setFeature(ThingsModelFactory.newFeature(featureId, featureDefinition, featureProperties));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ interface FromScratch {
* @throws NullPointerException if {@code featureId} is {@code null}.
*/
FromScratch setFeature(String featureId, FeatureDefinition featureDefinition,
FeatureProperties featureProperties);
@Nullable FeatureProperties featureProperties);

/**
* Sets a Feature with the given ID and properties to this builder. A previously set Feature with the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
Expand Down Expand Up @@ -304,10 +303,7 @@ private <T extends Command<?>> CompletionStage<Object> executeCount(final T coun

@SuppressWarnings("unchecked")
final T tracedCountCommand = (T) countCommand.setDittoHeaders(
DittoHeaders.of(spanWithTimer.startedSpan.propagateContext(dittoHeaders.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
)));
DittoHeaders.of(spanWithTimer.startedSpan.propagateContext(dittoHeaders)));

final Source<CountThingsResponse, ?> countThingsResponseSource =
createQuerySource(queryParseFunction, tracedCountCommand)
Expand Down Expand Up @@ -352,10 +348,7 @@ private CompletionStage<Object> performStream(final StreamThings streamThings, f
final var namespaces = streamThings.getNamespaces().orElse(null);

final StreamThings tracedStreamThings = streamThings.setDittoHeaders(
DittoHeaders.of(spanWithTimer.startedSpan.propagateContext(streamThings.getDittoHeaders().toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
)));
DittoHeaders.of(spanWithTimer.startedSpan.propagateContext(streamThings.getDittoHeaders())));

final Source<SourceRef<String>, NotUsed> thingIdSourceRefSource =
ThingsSearchCursor.extractCursor(tracedStreamThings).flatMapConcat(cursor -> {
Expand Down Expand Up @@ -472,10 +465,7 @@ private CompletionStage<Object> performQuery(final QueryThings queryThings, fina
final var namespaces = queryThings.getNamespaces().orElse(null);

final QueryThings tracedQueryThings = queryThings.setDittoHeaders(
DittoHeaders.of(spanWithTimer.startedSpan.propagateContext(queryThings.getDittoHeaders().toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
)));
DittoHeaders.of(spanWithTimer.startedSpan.propagateContext(queryThings.getDittoHeaders())));

final Source<QueryThingsResponse, ?> queryThingsResponseSource =
ThingsSearchCursor.extractCursor(tracedQueryThings, getSystem()).flatMapConcat(cursor -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,10 +548,7 @@ private Optional<Metadata> computeEventMetadata(final ThingEvent<?> thingEvent,
final StartedSpan startedSpan = DittoTracing.newStartedSpanByTimer(thingEvent.getDittoHeaders(), startedTimer);
ConsistencyLag.startS1InUpdater(startedTimer);
final var tracedEvent = thingEvent.setDittoHeaders(DittoHeaders.of(startedSpan.propagateContext(
thingEvent.getDittoHeaders().toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
)));
thingEvent.getDittoHeaders())));
final var metadata = exportMetadataWithSender(shouldAcknowledge, tracedEvent, getAckRecipient(
tracedEvent.getDittoHeaders()), startedTimer, data)
.withUpdateReason(UpdateReason.THING_UPDATE);
Expand Down

0 comments on commit fa2b4c9

Please sign in to comment.