diff --git a/dd-java-agent/instrumentation/armeria/armeria-grpc-0.84/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/GrpcClientDecorator.java b/dd-java-agent/instrumentation/armeria/armeria-grpc-0.84/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/GrpcClientDecorator.java index dc4c794efc6..e202fd07cee 100644 --- a/dd-java-agent/instrumentation/armeria/armeria-grpc-0.84/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/GrpcClientDecorator.java +++ b/dd-java-agent/instrumentation/armeria/armeria-grpc-0.84/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/GrpcClientDecorator.java @@ -1,6 +1,7 @@ package datadog.trace.instrumentation.armeria.grpc.client; import static datadog.context.propagation.Propagators.defaultPropagator; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig; @@ -32,8 +33,7 @@ public class GrpcClientDecorator extends ClientDecorator { public static final CharSequence GRPC_MESSAGE = UTF8BytesString.create("grpc.message"); private static DataStreamsContext createDsmContext() { - return DataStreamsContext.fromTags( - DataStreamsTags.create("grpc", DataStreamsTags.Direction.Outbound)); + return DataStreamsContext.fromTags(DataStreamsTags.create("grpc", OUTBOUND)); } public static final GrpcClientDecorator DECORATE = new GrpcClientDecorator(); diff --git a/dd-java-agent/instrumentation/armeria/armeria-grpc-0.84/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/GrpcServerDecorator.java b/dd-java-agent/instrumentation/armeria/armeria-grpc-0.84/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/GrpcServerDecorator.java index 944324f3b4d..192d2e9a9c2 100644 --- a/dd-java-agent/instrumentation/armeria/armeria-grpc-0.84/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/GrpcServerDecorator.java +++ b/dd-java-agent/instrumentation/armeria/armeria-grpc-0.84/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/GrpcServerDecorator.java @@ -1,5 +1,7 @@ package datadog.trace.instrumentation.armeria.grpc.server; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; + import datadog.trace.api.Config; import datadog.trace.api.cache.DDCache; import datadog.trace.api.cache.DDCaches; @@ -30,7 +32,7 @@ public class GrpcServerDecorator extends ServerDecorator { public static final CharSequence GRPC_MESSAGE = UTF8BytesString.create("grpc.message"); private static DataStreamsTags createServerPathwaySortedTags() { - return DataStreamsTags.create("grpc", DataStreamsTags.Direction.Inbound); + return DataStreamsTags.create("grpc", INBOUND); } public static final DataStreamsTags SERVER_PATHWAY_EDGE_TAGS = createServerPathwaySortedTags(); diff --git a/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/EventBridgeInterceptor.java b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/EventBridgeInterceptor.java index 30d8d02d356..8b01b573464 100644 --- a/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/EventBridgeInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/EventBridgeInterceptor.java @@ -1,6 +1,7 @@ package datadog.trace.instrumentation.aws.v2.eventbridge; import static datadog.context.propagation.Propagators.defaultPropagator; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig; import static datadog.trace.instrumentation.aws.v2.eventbridge.TextMapInjectAdapter.SETTER; @@ -85,8 +86,7 @@ private String getTraceContextToInject( // Inject context datadog.context.Context context = span; if (traceConfig().isDataStreamsEnabled()) { - DataStreamsTags tags = - DataStreamsTags.createWithBus(DataStreamsTags.Direction.Outbound, eventBusName); + DataStreamsTags tags = DataStreamsTags.createWithBus(OUTBOUND, eventBusName); DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags); context = context.with(dsmContext); } diff --git a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AWSHttpClientInstrumentation.java b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AWSHttpClientInstrumentation.java index 443e7e02064..0d773ee49df 100644 --- a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AWSHttpClientInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AWSHttpClientInstrumentation.java @@ -3,13 +3,15 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closeActive; +import static datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge.spanFromContext; +import static datadog.trace.instrumentation.aws.v0.OnErrorDecorator.CONTEXT_CONTEXT_KEY; import static datadog.trace.instrumentation.aws.v0.OnErrorDecorator.DECORATE; -import static datadog.trace.instrumentation.aws.v0.OnErrorDecorator.SPAN_CONTEXT_KEY; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import com.amazonaws.AmazonClientException; import com.amazonaws.Request; import com.amazonaws.handlers.RequestHandler2; +import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import net.bytebuddy.asm.Advice; @@ -54,12 +56,15 @@ public static void methodExit( } if (throwable != null && request != null) { - final AgentSpan span = request.getHandlerContext(SPAN_CONTEXT_KEY); - if (span != null) { - request.addHandlerContext(SPAN_CONTEXT_KEY, null); - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.finish(); + final Context context = request.getHandlerContext(CONTEXT_CONTEXT_KEY); + if (context != null) { + request.addHandlerContext(CONTEXT_CONTEXT_KEY, null); + final AgentSpan span = spanFromContext(context); + if (span != null) { + DECORATE.onError(span, throwable); + DECORATE.beforeFinish(span); + span.finish(); + } } } } diff --git a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkClientDecorator.java b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkClientDecorator.java index b80f55a8420..afc8db3a43e 100644 --- a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkClientDecorator.java +++ b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkClientDecorator.java @@ -1,6 +1,8 @@ package datadog.trace.instrumentation.aws.v0; import static datadog.trace.api.datastreams.DataStreamsContext.create; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig; import static datadog.trace.bootstrap.instrumentation.api.ResourceNamePriorities.RPC_COMMAND_NAME; @@ -264,8 +266,7 @@ && traceConfig().isDataStreamsEnabled()) { && ("GetObjectMetadataRequest".equalsIgnoreCase(awsOperation) || "GetObjectRequest".equalsIgnoreCase(awsOperation))) { DataStreamsTags tags = - DataStreamsTags.createWithDataset( - "s3", DataStreamsTags.Direction.Inbound, bucket, key, bucket); + DataStreamsTags.createWithDataset("s3", INBOUND, bucket, key, bucket); AgentTracer.get() .getDataStreamsMonitoring() .setCheckpoint(span, create(tags, 0, responseSize)); @@ -279,8 +280,7 @@ && traceConfig().isDataStreamsEnabled()) { payloadSize = (long) requestSize; } DataStreamsTags tags = - DataStreamsTags.createWithDataset( - "s3", DataStreamsTags.Direction.Outbound, bucket, key, bucket); + DataStreamsTags.createWithDataset("s3", OUTBOUND, bucket, key, bucket); AgentTracer.get() .getDataStreamsMonitoring() .setCheckpoint(span, create(tags, 0, payloadSize)); diff --git a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkModule.java b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkModule.java index aa3d81da4a8..8c370ce03b8 100644 --- a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkModule.java +++ b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkModule.java @@ -37,9 +37,7 @@ public String[] helperClassNames() { public Map contextStore() { Map map = new java.util.HashMap<>(); map.put(namespace + ".services.sqs.model.ReceiveMessageResult", "java.lang.String"); - map.put( - namespace + ".AmazonWebServiceRequest", - "datadog.trace.bootstrap.instrumentation.api.AgentSpan"); + map.put(namespace + ".AmazonWebServiceRequest", "datadog.context.Context"); return map; } diff --git a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/HandlerChainFactoryInstrumentation.java b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/HandlerChainFactoryInstrumentation.java index 07dfa82382d..30abe11b161 100644 --- a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/HandlerChainFactoryInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/HandlerChainFactoryInstrumentation.java @@ -46,8 +46,7 @@ public static void addHandler(@Advice.Return final List handler InstrumentationContext.get( "com.amazonaws.services.sqs.model.ReceiveMessageResult", "java.lang.String"), InstrumentationContext.get( - "com.amazonaws.AmazonWebServiceRequest", - "datadog.trace.bootstrap.instrumentation.api.AgentSpan"))); + "com.amazonaws.AmazonWebServiceRequest", "datadog.context.Context"))); } } } diff --git a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/OnErrorDecorator.java b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/OnErrorDecorator.java index e3b48847aa5..fb25eb82d07 100644 --- a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/OnErrorDecorator.java +++ b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/OnErrorDecorator.java @@ -1,14 +1,15 @@ package datadog.trace.instrumentation.aws.v0; import com.amazonaws.handlers.HandlerContextKey; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.context.Context; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator; public class OnErrorDecorator extends BaseDecorator { - public static final HandlerContextKey SPAN_CONTEXT_KEY = - new HandlerContextKey<>("DatadogSpan"); // same as TracingRequestHandler.SPAN_CONTEXT_KEY + public static final HandlerContextKey CONTEXT_CONTEXT_KEY = + new HandlerContextKey<>( + "DatadogContext"); // same as TracingRequestHandler.CONTEXT_CONTEXT_KEY public static final OnErrorDecorator DECORATE = new OnErrorDecorator(); diff --git a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/RequestExecutorInstrumentation.java b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/RequestExecutorInstrumentation.java index e3e23024f9b..ab0344047d8 100644 --- a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/RequestExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/RequestExecutorInstrumentation.java @@ -3,11 +3,13 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closeActive; +import static datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge.spanFromContext; +import static datadog.trace.instrumentation.aws.v0.OnErrorDecorator.CONTEXT_CONTEXT_KEY; import static datadog.trace.instrumentation.aws.v0.OnErrorDecorator.DECORATE; -import static datadog.trace.instrumentation.aws.v0.OnErrorDecorator.SPAN_CONTEXT_KEY; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import com.amazonaws.Request; +import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import net.bytebuddy.asm.Advice; @@ -51,12 +53,15 @@ public static void methodExit( } if (throwable != null) { - final AgentSpan span = request.getHandlerContext(SPAN_CONTEXT_KEY); - if (span != null) { - request.addHandlerContext(SPAN_CONTEXT_KEY, null); - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.finish(); + final Context context = request.getHandlerContext(CONTEXT_CONTEXT_KEY); + if (context != null) { + request.addHandlerContext(CONTEXT_CONTEXT_KEY, null); + final AgentSpan span = spanFromContext(context); + if (span != null) { + DECORATE.onError(span, throwable); + DECORATE.beforeFinish(span); + span.finish(); + } } } } diff --git a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/TracingRequestHandler.java b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/TracingRequestHandler.java index 07b872a1551..d7ba2b20680 100644 --- a/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/TracingRequestHandler.java +++ b/dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/TracingRequestHandler.java @@ -1,11 +1,14 @@ package datadog.trace.instrumentation.aws.v0; import static datadog.trace.api.datastreams.DataStreamsContext.create; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.XRAY_TRACING_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpanWithoutScope; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.blackholeSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig; +import static datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge.spanFromContext; import static datadog.trace.instrumentation.aws.v0.AwsSdkClientDecorator.AWS_LEGACY_TRACING; import static datadog.trace.instrumentation.aws.v0.AwsSdkClientDecorator.DECORATE; @@ -14,9 +17,13 @@ import com.amazonaws.Response; import com.amazonaws.handlers.HandlerContextKey; import com.amazonaws.handlers.RequestHandler2; +import datadog.context.Context; import datadog.context.propagation.Propagators; import datadog.trace.api.Config; -import datadog.trace.api.datastreams.*; +import datadog.trace.api.datastreams.AgentDataStreamsMonitoring; +import datadog.trace.api.datastreams.DataStreamsContext; +import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.PathwayContext; import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; @@ -28,41 +35,42 @@ /** Tracing Request Handler */ public class TracingRequestHandler extends RequestHandler2 { - public static final HandlerContextKey SPAN_CONTEXT_KEY = - new HandlerContextKey<>("DatadogSpan"); // same as OnErrorDecorator.SPAN_CONTEXT_KEY + public static final HandlerContextKey CONTEXT_CONTEXT_KEY = + new HandlerContextKey<>("DatadogContext"); // same as OnErrorDecorator.CONTEXT_CONTEXT_KEY private static final Logger log = LoggerFactory.getLogger(TracingRequestHandler.class); private final ContextStore responseQueueStore; - private final ContextStore requestSpanStore; + private final ContextStore requestContextStore; public TracingRequestHandler( ContextStore responseQueueStore, - ContextStore requestSpanStore) { + ContextStore requestContextStore) { this.responseQueueStore = responseQueueStore; - this.requestSpanStore = requestSpanStore; + this.requestContextStore = requestContextStore; } @Override public void beforeRequest(final Request request) { - AgentSpan span; if (!AWS_LEGACY_TRACING && isPollingRequest(request.getOriginalRequest())) { // SQS messages spans are created by aws-java-sqs-1.0 - replace client scope with no-op, // so we can tell when receive call is complete without affecting the rest of the trace activateSpanWithoutScope(blackholeSpan()); } else { - span = requestSpanStore.remove(request.getOriginalRequest()); + Context context = requestContextStore.remove(request.getOriginalRequest()); + AgentSpan span = spanFromContext(context); if (span != null) { // we'll land here for SQS send requests when DSM is enabled. In that case, we create the // span in SqsInterceptor to inject DSM tags. span.setOperationName(AwsNameCache.spanName(request)); } else { // this is the most common code path - span = startSpan(AwsNameCache.spanName(request)); + span = startSpan("aws-sdk", AwsNameCache.spanName(request)); + context = span; // TODO If DSM is enabled, add DSM context here too } DECORATE.afterStart(span); DECORATE.onRequest(span, request); - request.addHandlerContext(SPAN_CONTEXT_KEY, span); + request.addHandlerContext(CONTEXT_CONTEXT_KEY, context); if (Config.get().isAwsPropagationEnabled()) { try { Propagators.forConcern(XRAY_TRACING_CONCERN).inject(span, request, DECORATE); @@ -81,13 +89,18 @@ public void beforeRequest(final Request request) { @Override public void afterResponse(final Request request, final Response response) { - final AgentSpan span = request.getHandlerContext(SPAN_CONTEXT_KEY); - if (span != null) { - request.addHandlerContext(SPAN_CONTEXT_KEY, null); - DECORATE.onResponse(span, response); - DECORATE.onServiceResponse(span, request.getServiceName(), response); - DECORATE.beforeFinish(span); - span.finish(); + final Context context = request.getHandlerContext(CONTEXT_CONTEXT_KEY); + log.warn("context {}", context); + AgentSpan span = null; + if (context != null) { + request.addHandlerContext(CONTEXT_CONTEXT_KEY, null); + span = spanFromContext(context); + if (span != null) { + DECORATE.onResponse(span, response); + DECORATE.onServiceResponse(span, request.getServiceName(), response); + DECORATE.beforeFinish(span); + span.finish(); + } } AmazonWebServiceRequest originalRequest = request.getOriginalRequest(); GetterAccess requestAccess = GetterAccess.of(originalRequest); @@ -105,22 +118,25 @@ && traceConfig().isDataStreamsEnabled() && "AmazonKinesis".equals(request.getServiceName()) && "GetRecords".equals(requestAccess.getOperationNameFromType())) { String streamArn = requestAccess.getStreamARN(originalRequest); - if (null != streamArn) { - List records = - GetterAccess.of(response.getAwsResponse()).getRecords(response.getAwsResponse()); - if (null != records) { - DataStreamsTags tags = - DataStreamsTags.create("kinesis", DataStreamsTags.Direction.Inbound, streamArn); - for (Object record : records) { - Date arrivalTime = GetterAccess.of(record).getApproximateArrivalTimestamp(record); - AgentDataStreamsMonitoring dataStreamsMonitoring = - AgentTracer.get().getDataStreamsMonitoring(); - PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext(); - DataStreamsContext context = create(tags, arrivalTime.getTime(), 0); - pathwayContext.setCheckpoint(context, dataStreamsMonitoring::add); - if (!span.context().getPathwayContext().isStarted()) { - span.context().mergePathwayContext(pathwayContext); - } + dsmCheckpoint(span, streamArn, response); + } + } + + private void dsmCheckpoint(AgentSpan span, String streamArn, Response response) { + if (null != streamArn) { + List records = + GetterAccess.of(response.getAwsResponse()).getRecords(response.getAwsResponse()); + if (null != records) { + DataStreamsTags tags = create("kinesis", INBOUND, streamArn); + for (Object record : records) { + Date arrivalTime = GetterAccess.of(record).getApproximateArrivalTimestamp(record); + AgentDataStreamsMonitoring dataStreamsMonitoring = + AgentTracer.get().getDataStreamsMonitoring(); + PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext(); + DataStreamsContext dataStreamsContext = create(tags, arrivalTime.getTime(), 0); + pathwayContext.setCheckpoint(dataStreamsContext, dataStreamsMonitoring::add); + if (!span.context().getPathwayContext().isStarted()) { + span.context().mergePathwayContext(pathwayContext); } } } @@ -129,24 +145,27 @@ && traceConfig().isDataStreamsEnabled() @Override public void afterError(final Request request, final Response response, final Exception e) { - AgentSpan span = request.getHandlerContext(SPAN_CONTEXT_KEY); - if (span == null) { - // also try getting the span from the context store, if the error happened early - span = requestSpanStore.remove(request.getOriginalRequest()); + Context context = request.getHandlerContext(CONTEXT_CONTEXT_KEY); + if (context == null) { + // also try getting the context from the context store, if the error happened early + context = requestContextStore.remove(request.getOriginalRequest()); } - if (span != null) { - request.addHandlerContext(SPAN_CONTEXT_KEY, null); - if (response != null) { - DECORATE.onResponse(span, response); - if (span.isError()) { + if (context != null) { + request.addHandlerContext(CONTEXT_CONTEXT_KEY, null); + final AgentSpan span = spanFromContext(context); + if (span != null) { + if (response != null) { + DECORATE.onResponse(span, response); + if (span.isError()) { + DECORATE.onError(span, e); + } + } else { DECORATE.onError(span, e); } - } else { - DECORATE.onError(span, e); + DECORATE.beforeFinish(span); + span.finish(); } - DECORATE.beforeFinish(span); - span.finish(); } } diff --git a/dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java b/dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java index 4a270b63772..abb0a85cd0e 100644 --- a/dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java +++ b/dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java @@ -1,6 +1,10 @@ package datadog.trace.instrumentation.aws.v2; import static datadog.trace.api.datastreams.DataStreamsContext.create; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; +import static datadog.trace.api.datastreams.DataStreamsTags.createWithDataset; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig; import datadog.context.propagation.CarrierSetter; @@ -344,9 +348,7 @@ public AgentSpan onSdkResponse( //noinspection unchecked List records = (List) recordsRaw; if (!records.isEmpty()) { - DataStreamsTags tags = - DataStreamsTags.create( - "kinesis", DataStreamsTags.Direction.Inbound, streamArn); + DataStreamsTags tags = create("kinesis", INBOUND, streamArn); if (null == kinesisApproximateArrivalTimestampField) { Optional> maybeField = records.get(0).sdkFields().stream() @@ -389,9 +391,7 @@ public AgentSpan onSdkResponse( if (key != null && bucket != null && awsOperation != null) { if ("GetObject".equalsIgnoreCase(awsOperation)) { - DataStreamsTags tags = - DataStreamsTags.createWithDataset( - "s3", DataStreamsTags.Direction.Inbound, bucket, key, bucket); + DataStreamsTags tags = createWithDataset("s3", INBOUND, bucket, key, bucket); AgentTracer.get() .getDataStreamsMonitoring() .setCheckpoint(span, create(tags, 0, responseSize)); @@ -404,9 +404,7 @@ public AgentSpan onSdkResponse( payloadSize = (long) requestSize; } - DataStreamsTags tags = - DataStreamsTags.createWithDataset( - "s3", DataStreamsTags.Direction.Outbound, bucket, key, bucket); + DataStreamsTags tags = createWithDataset("s3", OUTBOUND, bucket, key, bucket); AgentTracer.get() .getDataStreamsMonitoring() .setCheckpoint(span, create(tags, 0, payloadSize)); diff --git a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsClientInstrumentation.java b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsClientInstrumentation.java index a5036d0c1c7..bd80bfd554a 100644 --- a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsClientInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsClientInstrumentation.java @@ -42,8 +42,7 @@ public String[] helperClassNames() { @Override public Map contextStore() { return Collections.singletonMap( - "com.amazonaws.AmazonWebServiceRequest", - "datadog.trace.bootstrap.instrumentation.api.AgentSpan"); + "com.amazonaws.AmazonWebServiceRequest", "datadog.context.Context"); } public static class HandlerChainAdvice { @@ -57,8 +56,7 @@ public static void addHandler(@Advice.Return final List handler handlers.add( new SnsInterceptor( InstrumentationContext.get( - "com.amazonaws.AmazonWebServiceRequest", - "datadog.trace.bootstrap.instrumentation.api.AgentSpan"))); + "com.amazonaws.AmazonWebServiceRequest", "datadog.context.Context"))); } } } diff --git a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsInterceptor.java index 0b991bdef8e..63855b1455c 100644 --- a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsInterceptor.java @@ -1,6 +1,8 @@ package datadog.trace.instrumentation.aws.v1.sns; import static datadog.context.propagation.Propagators.defaultPropagator; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig; import static datadog.trace.instrumentation.aws.v1.sns.TextMapInjectAdapter.SETTER; @@ -23,9 +25,9 @@ public class SnsInterceptor extends RequestHandler2 { - private final ContextStore contextStore; + private final ContextStore contextStore; - public SnsInterceptor(ContextStore contextStore) { + public SnsInterceptor(ContextStore contextStore) { this.contextStore = contextStore; } @@ -106,11 +108,12 @@ private AgentSpan newSpan(AmazonWebServiceRequest request) { final AgentSpan span = AgentTracer.startSpan("aws.sns.send"); // pass the span to TracingRequestHandler in the sdk instrumentation where it'll be enriched & // activated + // TODO If DSM is enabled, add DSM context here too contextStore.put(request, span); return span; } private DataStreamsTags getTags(String snsTopicName) { - return DataStreamsTags.create("sns", DataStreamsTags.Direction.Outbound, snsTopicName); + return create("sns", OUTBOUND, snsTopicName); } } diff --git a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java index 3a10aa085e4..a81be0759f8 100644 --- a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java @@ -1,6 +1,8 @@ package datadog.trace.instrumentation.aws.v2.sns; import static datadog.context.propagation.Propagators.defaultPropagator; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig; import static datadog.trace.instrumentation.aws.v2.sns.TextMapInjectAdapter.SETTER; @@ -103,6 +105,6 @@ public SdkRequest modifyRequest( } private DataStreamsTags getTags(String snsTopicName) { - return DataStreamsTags.create("sns", DataStreamsTags.Direction.Outbound, snsTopicName); + return create("sns", OUTBOUND, snsTopicName); } } diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java index 1aa1414a013..2e7dee5f360 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java @@ -46,8 +46,7 @@ public String[] helperClassNames() { @Override public Map contextStore() { return Collections.singletonMap( - "com.amazonaws.AmazonWebServiceRequest", - "datadog.trace.bootstrap.instrumentation.api.AgentSpan"); + "com.amazonaws.AmazonWebServiceRequest", "datadog.context.Context"); } public static class HandlerChainAdvice { @@ -62,8 +61,7 @@ public static void addHandler(@Advice.Return final List handler handlers.add( new SqsInterceptor( InstrumentationContext.get( - "com.amazonaws.AmazonWebServiceRequest", - "datadog.trace.bootstrap.instrumentation.api.AgentSpan"))); + "com.amazonaws.AmazonWebServiceRequest", "datadog.context.Context"))); } } } diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java index ef6c1e516a1..3a7fa98632f 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java @@ -1,5 +1,7 @@ package datadog.trace.instrumentation.aws.v1.sqs; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; @@ -27,9 +29,9 @@ public class SqsInterceptor extends RequestHandler2 { - private final ContextStore contextStore; + private final ContextStore contextStore; - public SqsInterceptor(ContextStore contextStore) { + public SqsInterceptor(ContextStore contextStore) { this.contextStore = contextStore; } @@ -87,11 +89,12 @@ private AgentSpan newSpan(AmazonWebServiceRequest request) { final AgentSpan span = startSpan("sqs", "aws.sqs.send"); // pass the span to TracingRequestHandler in the sdk instrumentation where it'll be enriched & // activated + // TODO If DSM is enabled, add DSM context here too contextStore.put(request, span); return span; } private static DataStreamsTags getTags(String queueUrl) { - return DataStreamsTags.create("sqs", DataStreamsTags.Direction.Outbound, urlFileName(queueUrl)); + return create("sqs", OUTBOUND, urlFileName(queueUrl)); } } diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java index 25a82dbac65..98aeca1ec84 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java @@ -1,6 +1,8 @@ package datadog.trace.instrumentation.aws.v1.sqs; import static datadog.trace.api.datastreams.DataStreamsContext.create; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; @@ -85,8 +87,7 @@ protected void startNewMessageSpan(Message message) { } AgentSpan span = startSpan(SQS_INBOUND_OPERATION, batchContext); - DataStreamsTags tags = - DataStreamsTags.create("sqs", DataStreamsTags.Direction.Inbound, urlFileName(queueUrl)); + DataStreamsTags tags = create("sqs", INBOUND, urlFileName(queueUrl)); AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, create(tags, 0, 0)); CONSUMER_DECORATE.afterStart(span); diff --git a/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java index 399cd889a31..cc09d8686c5 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java @@ -1,5 +1,7 @@ package datadog.trace.instrumentation.aws.v2.sqs; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName; @@ -92,8 +94,7 @@ private datadog.context.Context getContext( ExecutionAttributes executionAttributes, String queueUrl) { AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); - DataStreamsTags tags = - DataStreamsTags.create("sqs", DataStreamsTags.Direction.Outbound, urlFileName(queueUrl)); + DataStreamsTags tags = create("sqs", OUTBOUND, urlFileName(queueUrl)); DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags); return span.with(dsmContext); } diff --git a/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java b/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java index 3a00836e44d..3991bfc63b1 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java @@ -1,6 +1,8 @@ package datadog.trace.instrumentation.aws.v2.sqs; import static datadog.trace.api.datastreams.DataStreamsContext.create; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; @@ -87,8 +89,7 @@ protected void startNewMessageSpan(Message message) { } AgentSpan span = startSpan(SQS_INBOUND_OPERATION, batchContext); - DataStreamsTags tags = - DataStreamsTags.create("sqs", DataStreamsTags.Direction.Inbound, urlFileName(queueUrl)); + DataStreamsTags tags = create("sqs", INBOUND, urlFileName(queueUrl)); AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, create(tags, 0, 0)); CONSUMER_DECORATE.afterStart(span); diff --git a/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PubSubDecorator.java b/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PubSubDecorator.java index a6287c22cef..05c0a559e41 100644 --- a/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PubSubDecorator.java +++ b/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PubSubDecorator.java @@ -1,5 +1,7 @@ package datadog.trace.instrumentation.googlepubsub; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.createWithSubscription; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; @@ -125,8 +127,7 @@ public AgentSpan onConsume(final PubsubMessage message, final String subscriptio final AgentSpan span = startSpan(PUBSUB_CONSUME, spanContext); final CharSequence parsedSubscription = extractSubscription(subscription); DataStreamsTags tags = - DataStreamsTags.createWithSubscription( - "google-pubsub", DataStreamsTags.Direction.Inbound, parsedSubscription.toString()); + createWithSubscription("google-pubsub", INBOUND, parsedSubscription.toString()); final Timestamp publishTime = message.getPublishTime(); // FIXME: use full nanosecond resolution when this method will accept nanos AgentTracer.get() diff --git a/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PublisherInstrumentation.java b/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PublisherInstrumentation.java index 7c51970ffc8..f825b9c032f 100644 --- a/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PublisherInstrumentation.java +++ b/dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PublisherInstrumentation.java @@ -2,6 +2,8 @@ import static datadog.context.propagation.Propagators.defaultPropagator; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.RUNNABLE; @@ -71,9 +73,7 @@ public static AgentScope before( PRODUCER_DECORATE.afterStart(span); PRODUCER_DECORATE.onProduce(span, topicName); - DataStreamsTags tags = - DataStreamsTags.create( - "google-pubsub", DataStreamsTags.Direction.Outbound, topicName.toString()); + DataStreamsTags tags = create("google-pubsub", OUTBOUND, topicName.toString()); PubsubMessage.Builder builder = msg.toBuilder(); DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags); defaultPropagator().inject(span.with(dsmContext), builder, SETTER); diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcClientDecorator.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcClientDecorator.java index c35da6a0e34..7fd3a276136 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcClientDecorator.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcClientDecorator.java @@ -1,6 +1,9 @@ package datadog.trace.instrumentation.grpc.client; import static datadog.context.propagation.Propagators.defaultPropagator; +import static datadog.trace.api.datastreams.DataStreamsContext.fromTags; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig; @@ -11,7 +14,6 @@ import datadog.trace.api.cache.DDCache; import datadog.trace.api.cache.DDCaches; import datadog.trace.api.datastreams.DataStreamsContext; -import datadog.trace.api.datastreams.DataStreamsTags; import datadog.trace.api.naming.SpanNaming; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes; @@ -32,8 +34,7 @@ public class GrpcClientDecorator extends ClientDecorator { public static final CharSequence GRPC_MESSAGE = UTF8BytesString.create("grpc.message"); private static DataStreamsContext createDsmContext() { - return DataStreamsContext.fromTags( - DataStreamsTags.create("grpc", DataStreamsTags.Direction.Outbound)); + return fromTags(create("grpc", OUTBOUND)); } public static final GrpcClientDecorator DECORATE = new GrpcClientDecorator(); diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/GrpcServerDecorator.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/GrpcServerDecorator.java index cafd0d64ad7..a835f725b4c 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/GrpcServerDecorator.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/GrpcServerDecorator.java @@ -1,5 +1,8 @@ package datadog.trace.instrumentation.grpc.server; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; + import datadog.trace.api.Config; import datadog.trace.api.cache.DDCache; import datadog.trace.api.cache.DDCaches; @@ -30,7 +33,7 @@ public class GrpcServerDecorator extends ServerDecorator { public static final CharSequence GRPC_MESSAGE = UTF8BytesString.create("grpc.message"); private static DataStreamsTags createServerPathwaySortedTags() { - return DataStreamsTags.create("grpc", DataStreamsTags.Direction.Inbound); + return create("grpc", INBOUND); } public static final DataStreamsTags SERVER_PATHWAY_EDGE_TAGS = createServerPathwaySortedTags(); diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 5c851418eb3..ba66a01b6da 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -4,6 +4,8 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.api.datastreams.DataStreamsContext.fromTagsWithoutCheckpoint; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.createWithClusterId; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; @@ -140,9 +142,7 @@ public static AgentScope onEnter( && !Config.get().isKafkaClientPropagationDisabledForTopic(record.topic())) { setter = TextMapInjectAdapter.SETTER; } - DataStreamsTags tags = - DataStreamsTags.createWithClusterId( - "kafka", DataStreamsTags.Direction.Outbound, record.topic(), clusterId); + DataStreamsTags tags = createWithClusterId("kafka", OUTBOUND, record.topic(), clusterId); try { defaultPropagator().inject(span, record.headers(), setter); if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java index 832a5097bb5..f634706ca5e 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java @@ -1,6 +1,8 @@ package datadog.trace.instrumentation.kafka_clients; import static datadog.trace.api.datastreams.DataStreamsContext.create; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext; @@ -95,9 +97,7 @@ protected void startNewRecordSpan(ConsumerRecord val) { // spans are written out together by TraceStructureWriter when running in strict mode } - DataStreamsTags tags = - DataStreamsTags.create( - "kafka", DataStreamsTags.Direction.Inbound, val.topic(), group, clusterId); + DataStreamsTags tags = create("kafka", INBOUND, val.topic(), group, clusterId); final long payloadSize = traceConfig().isDataStreamsEnabled() ? computePayloadSizeBytes(val) : 0; if (STREAMING_CONTEXT.isDisabledForTopic(val.topic())) { diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java index c37d99796da..8d86dca9ea8 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java @@ -2,6 +2,8 @@ import static datadog.context.propagation.Propagators.defaultPropagator; import static datadog.trace.api.datastreams.DataStreamsContext.fromTagsWithoutCheckpoint; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; @@ -62,9 +64,7 @@ public static AgentScope onEnter( && !Config.get().isKafkaClientPropagationDisabledForTopic(record.topic())) { setter = TextMapInjectAdapter.SETTER; } - DataStreamsTags tags = - DataStreamsTags.create( - "kafka", DataStreamsTags.Direction.Outbound, record.topic(), null, clusterId); + DataStreamsTags tags = create("kafka", OUTBOUND, record.topic(), null, clusterId); try { defaultPropagator().inject(span, record.headers(), setter); if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java index 75ef16c865a..1d5e93031e6 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java @@ -1,6 +1,8 @@ package datadog.trace.instrumentation.kafka_clients38; import static datadog.trace.api.datastreams.DataStreamsContext.create; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext; @@ -95,9 +97,7 @@ protected void startNewRecordSpan(ConsumerRecord val) { // spans are written out together by TraceStructureWriter when running in strict mode } - DataStreamsTags tags = - DataStreamsTags.create( - "kafka", DataStreamsTags.Direction.Inbound, val.topic(), group, clusterId); + DataStreamsTags tags = create("kafka", INBOUND, val.topic(), group, clusterId); final long payloadSize = traceConfig().isDataStreamsEnabled() ? Utils.computePayloadSizeBytes(val) : 0; if (StreamingContext.STREAMING_CONTEXT.isDisabledForTopic(val.topic())) { diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java index ffd6a4df88e..817d706bb70 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java @@ -2,6 +2,8 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.api.datastreams.DataStreamsContext.create; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.createWithGroup; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; @@ -250,9 +252,7 @@ public static void start( if (streamTaskContext != null) { applicationId = streamTaskContext.getApplicationId(); } - DataStreamsTags tags = - DataStreamsTags.createWithGroup( - "kafka", DataStreamsTags.Direction.Inbound, applicationId, record.topic()); + DataStreamsTags tags = createWithGroup("kafka", INBOUND, applicationId, record.topic()); final long payloadSize = traceConfig().isDataStreamsEnabled() ? computePayloadSizeBytes(record.value) : 0; @@ -323,9 +323,7 @@ public static void start( if (streamTaskContext != null) { applicationId = streamTaskContext.getApplicationId(); } - DataStreamsTags tags = - DataStreamsTags.createWithGroup( - "kafka", DataStreamsTags.Direction.Inbound, applicationId, record.topic()); + DataStreamsTags tags = createWithGroup("kafka", INBOUND, applicationId, record.topic()); long payloadSize = 0; // we have to go through Object to get the RecordMetadata here because the class of `record` diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java index 0308399aee9..ffbab98be3a 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java @@ -6,6 +6,8 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.nameEndsWith; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.createWithExchange; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.noopSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; @@ -186,11 +188,8 @@ public static AgentScope setResourceNameAddHeaders( RabbitDecorator.injectTimeInQueueStart(headers); } DataStreamsTags tags = - DataStreamsTags.createWithExchange( - "rabbitmq", - DataStreamsTags.Direction.Outbound, - exchange, - routingKey != null && !routingKey.isEmpty()); + createWithExchange( + "rabbitmq", OUTBOUND, exchange, routingKey != null && !routingKey.isEmpty()); DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags); defaultPropagator().inject(span.with(dsmContext), headers, SETTER); props = diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java index fb4ac5e72dc..65df57e807c 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java @@ -1,6 +1,8 @@ package datadog.trace.instrumentation.rabbitmq.amqp; import static datadog.trace.api.datastreams.DataStreamsContext.create; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; @@ -246,8 +248,7 @@ public static AgentScope startReceivingSpan( } if (null != headers) { - DataStreamsTags tags = - DataStreamsTags.create("rabbitmq", DataStreamsTags.Direction.Inbound, queue); + DataStreamsTags tags = create("rabbitmq", INBOUND, queue); AgentTracer.get() .getDataStreamsMonitoring() .setCheckpoint(span, create(tags, produceMillis, 0)); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 58093fae2d2..8dfceeb6eaa 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -2,6 +2,10 @@ import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V01_DATASTREAMS_ENDPOINT; import static datadog.trace.api.datastreams.DataStreamsContext.fromTags; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; +import static datadog.trace.api.datastreams.DataStreamsTags.createManual; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static datadog.trace.util.AgentThreadFactory.AgentThread.DATA_STREAMS_MONITORING; import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; @@ -246,9 +250,9 @@ public void setConsumeCheckpoint( DataStreamsTags tags; if (isManual) { - tags = DataStreamsTags.createManual(type, DataStreamsTags.Direction.Inbound, source); + tags = createManual(type, INBOUND, source); } else { - tags = DataStreamsTags.create(type, DataStreamsTags.Direction.Inbound, source); + tags = create(type, INBOUND, source); } setCheckpoint(span, fromTags(tags)); @@ -268,9 +272,9 @@ public void setProduceCheckpoint( } DataStreamsTags tags; if (manualCheckpoint) { - tags = DataStreamsTags.createManual(type, DataStreamsTags.Direction.Outbound, target); + tags = createManual(type, OUTBOUND, target); } else { - tags = DataStreamsTags.create(type, DataStreamsTags.Direction.Outbound, target); + tags = create(type, OUTBOUND, target); } DataStreamsContext dsmContext = fromTags(tags); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index c37033e23d9..49e6c74639f 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -142,14 +142,14 @@ class DataStreamsWritingTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig }) dataStreams.start() dataStreams.add(new StatsPoint(DataStreamsTags.create(null, null), 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, null)) - dataStreams.add(new StatsPoint(DataStreamsTags.create("testType", DataStreamsTags.Direction.Inbound, "testTopic", "testGroup", null), 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null)) + dataStreams.add(new StatsPoint(DataStreamsTags.create("testType", DataStreamsTags.Direction.INBOUND, "testTopic", "testGroup", null), 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null)) dataStreams.trackBacklog(DataStreamsTags.createWithPartition("kafka_produce", "testTopic", "1", null, null), 100) dataStreams.trackBacklog(DataStreamsTags.createWithPartition("kafka_produce", "testTopic", "1", null, null), 130) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) - dataStreams.add(new StatsPoint(DataStreamsTags.create("testType", DataStreamsTags.Direction.Inbound, "testTopic", "testGroup", null), 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10, null)) + dataStreams.add(new StatsPoint(DataStreamsTags.create("testType", DataStreamsTags.Direction.INBOUND, "testTopic", "testGroup", null), 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(DataStreamsTags.create("testType", DataStreamsTags.Direction.Inbound, "testTopic", "testGroup", null), 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5, null)) - dataStreams.add(new StatsPoint(DataStreamsTags.create("testType", DataStreamsTags.Direction.Inbound, "testTopic2", "testGroup", null), 3, 4, 6, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2, null)) + dataStreams.add(new StatsPoint(DataStreamsTags.create("testType", DataStreamsTags.Direction.INBOUND, "testTopic", "testGroup", null), 1, 2, 5, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5, null)) + dataStreams.add(new StatsPoint(DataStreamsTags.create("testType", DataStreamsTags.Direction.INBOUND, "testTopic2", "testGroup", null), 3, 4, 6, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2, null)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.close() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy index 802670b6fe5..4c19b48afaf 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy @@ -71,9 +71,9 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(50) - context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.INBOUND)), pointConsumer) timeSource.advance(25) - def tags = DataStreamsTags.create("kafka", DataStreamsTags.Direction.Outbound, "topic", "group", null) + def tags = DataStreamsTags.create("kafka", DataStreamsTags.Direction.OUTBOUND, "topic", "group", null) context.setCheckpoint(fromTags(tags), pointConsumer) then: @@ -124,9 +124,9 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(50) - context.setCheckpoint(fromTags(DataStreamsTags.create("kafka", DataStreamsTags.Direction.Outbound)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("kafka", DataStreamsTags.Direction.OUTBOUND)), pointConsumer) timeSource.advance(25) - def tg = DataStreamsTags.create("kafka", DataStreamsTags.Direction.Inbound, "topic", "group", null) + def tg = DataStreamsTags.create("kafka", DataStreamsTags.Direction.INBOUND, "topic", "group", null) context.setCheckpoint(fromTags(tg), pointConsumer) timeSource.advance(30) context.setCheckpoint(fromTags(tg), pointConsumer) @@ -181,12 +181,12 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(fromTags(DataStreamsTags.createWithDataset("s3", DataStreamsTags.Direction.Inbound, null, "my_object.csv", "my_bucket")), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.createWithDataset("s3", DataStreamsTags.Direction.INBOUND, null, "my_object.csv", "my_bucket")), pointConsumer) def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(2)) def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) - def tg = DataStreamsTags.createWithDataset("s3", DataStreamsTags.Direction.Outbound, null, "my_object.csv", "my_bucket") + def tg = DataStreamsTags.createWithDataset("s3", DataStreamsTags.Direction.OUTBOUND, null, "my_object.csv", "my_bucket") context.setCheckpoint(fromTags(tg), pointConsumer) then: @@ -208,7 +208,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.INBOUND)), pointConsumer) def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(2)) def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded) @@ -259,13 +259,13 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.INBOUND)), pointConsumer) def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(1)) def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) - context.setCheckpoint(fromTags(DataStreamsTags.create("kafka", DataStreamsTags.Direction.Outbound, "topic", "group", null)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("kafka", DataStreamsTags.Direction.OUTBOUND, "topic", "group", null)), pointConsumer) then: decodedContext.isStarted() @@ -287,7 +287,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { timeSource.advance(MILLISECONDS.toNanos(2)) def secondDecode = DefaultPathwayContext.decode(timeSource, baseHash, null, secondEncode) timeSource.advance(MILLISECONDS.toNanos(30)) - context.setCheckpoint(fromTags(DataStreamsTags.create("kafka", DataStreamsTags.Direction.Inbound, "topicB", "group", null)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("kafka", DataStreamsTags.Direction.INBOUND, "topicB", "group", null)), pointConsumer) then: secondDecode.isStarted() @@ -314,14 +314,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.INBOUND)), pointConsumer) def encoded = context.encode() Map carrier = [(PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] timeSource.advance(MILLISECONDS.toNanos(1)) def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(25)) - context.setCheckpoint(fromTags(DataStreamsTags.create("kafka", DataStreamsTags.Direction.Outbound, "topic", "group", null)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("kafka", DataStreamsTags.Direction.OUTBOUND, "topic", "group", null)), pointConsumer) then: decodedContext.isStarted() @@ -344,7 +344,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { timeSource.advance(MILLISECONDS.toNanos(2)) def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(30)) - context.setCheckpoint(fromTags(DataStreamsTags.create("kafka", DataStreamsTags.Direction.Inbound, "topicB", "group", null)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("kafka", DataStreamsTags.Direction.INBOUND, "topicB", "group", null)), pointConsumer) then: secondDecode.isStarted() @@ -371,14 +371,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.INBOUND)), pointConsumer) def encoded = context.encode() Map carrier = [(PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] timeSource.advance(MILLISECONDS.toNanos(1)) def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(25)) - context.setCheckpoint(fromTags(DataStreamsTags.create("sqs", DataStreamsTags.Direction.Outbound, "topic", null, null)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("sqs", DataStreamsTags.Direction.OUTBOUND, "topic", null, null)), pointConsumer) then: decodedContext.isStarted() @@ -400,7 +400,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { timeSource.advance(MILLISECONDS.toNanos(2)) def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(30)) - context.setCheckpoint(fromTags(DataStreamsTags.create("sqs", DataStreamsTags.Direction.Inbound, "topicB", null, null)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("sqs", DataStreamsTags.Direction.INBOUND, "topicB", null, null)), pointConsumer) then: secondDecode.isStarted() @@ -423,9 +423,9 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: timeSource.advance(50) - context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.INBOUND)), pointConsumer) timeSource.advance(25) - context.setCheckpoint(fromTags(DataStreamsTags.create("type", DataStreamsTags.Direction.Outbound, "topic", "group", null)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("type", DataStreamsTags.Direction.OUTBOUND, "topic", "group", null)), pointConsumer) timeSource.advance(25) context.setCheckpoint(fromTags(DataStreamsTags.create(null, null)), pointConsumer) @@ -505,7 +505,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { DataStreamsTags.setGlobalBaseHash(baseHash) def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.INBOUND)), pointConsumer) def encoded = context.encode() Map carrier = [ (PROPAGATION_KEY_BASE64): encoded, @@ -559,7 +559,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { DataStreamsTags.setGlobalBaseHash(baseHash) def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.INBOUND)), pointConsumer) def encoded = context.encode() Map carrier = [(PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] @@ -614,7 +614,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { DataStreamsTags.setGlobalBaseHash(baseHash) def context = new DefaultPathwayContext(timeSource, baseHash, null) timeSource.advance(MILLISECONDS.toNanos(50)) - context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer) + context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.INBOUND)), pointConsumer) def encoded = context.encode() Map carrier = [(PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] def contextVisitor = new Base64MapContextVisitor() diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsContext.java b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsContext.java index 22cc02c74fe..bd2d69fed47 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsContext.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsContext.java @@ -1,5 +1,8 @@ package datadog.trace.api.datastreams; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; + import datadog.context.Context; import datadog.context.ContextKey; import datadog.context.ImplicitContextKeyed; @@ -16,8 +19,8 @@ public class DataStreamsContext implements ImplicitContextKeyed { final boolean sendCheckpoint; static { - CLIENT_PATHWAY_EDGE_TAGS = DataStreamsTags.create("http", DataStreamsTags.Direction.Outbound); - SERVER_PATHWAY_EDGE_TAGS = DataStreamsTags.create("http", DataStreamsTags.Direction.Inbound); + CLIENT_PATHWAY_EDGE_TAGS = DataStreamsTags.create("http", OUTBOUND); + SERVER_PATHWAY_EDGE_TAGS = DataStreamsTags.create("http", INBOUND); } public static DataStreamsContext fromContext(Context context) { diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTags.java b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTags.java index 938d8d953f2..cc4d30a2cfd 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTags.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTags.java @@ -5,9 +5,9 @@ public class DataStreamsTags { public enum Direction { - Unknown, - Inbound, - Outbound, + UNKNOWN, + INBOUND, + OUTBOUND, } public static DataStreamsTags EMPTY = DataStreamsTags.create(null, null); @@ -159,7 +159,7 @@ public Boolean hasAllTags(String[] tags) { case DIRECTION_TAG: if (!Objects.equals( this.directionValue, - Objects.equals(value, "out") ? Direction.Outbound : Direction.Inbound)) { + Objects.equals(value, "out") ? Direction.OUTBOUND : Direction.INBOUND)) { return false; } break; @@ -307,9 +307,9 @@ public DataStreamsTags( String partition) { this.bus = bus != null ? BUS_TAG + ":" + bus : null; this.directionValue = direction; - if (direction == Direction.Inbound) { + if (direction == Direction.INBOUND) { this.direction = DIRECTION_TAG + ":in"; - } else if (direction == Direction.Outbound) { + } else if (direction == Direction.OUTBOUND) { this.direction = DIRECTION_TAG + ":out"; } else { this.direction = null; diff --git a/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsTagsTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsTagsTest.groovy index ef73cfaae4c..aa2da676516 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsTagsTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/datastreams/DataStreamsTagsTest.groovy @@ -6,7 +6,7 @@ import java.nio.ByteBuffer class DataStreamsTagsTest extends Specification { def getTags(int idx) { - return new DataStreamsTags("bus" + idx, DataStreamsTags.Direction.Outbound, "exchange" + idx, "topic" + idx, "type" + idx, "subscription" + idx, + return new DataStreamsTags("bus" + idx, DataStreamsTags.Direction.OUTBOUND, "exchange" + idx, "topic" + idx, "type" + idx, "subscription" + idx, "dataset_name" + idx, "dataset_namespace" + idx, true, "group" + idx, "consumer_group" + idx, true, "kafka_cluster_id" + idx, "partition" + idx) } @@ -31,13 +31,13 @@ class DataStreamsTagsTest extends Specification { tg.getHasRoutingKey() == DataStreamsTags.HAS_ROUTING_KEY_TAG + ":true" tg.getKafkaClusterId() == DataStreamsTags.KAFKA_CLUSTER_ID_TAG + ":kafka_cluster_id0" tg.getPartition() == DataStreamsTags.PARTITION_TAG + ":partition0" - tg.getDirectionValue() == DataStreamsTags.Direction.Outbound + tg.getDirectionValue() == DataStreamsTags.Direction.OUTBOUND tg.toString() != null } def 'test has all tags'() { setup: - def tags = new DataStreamsTags("bus", DataStreamsTags.Direction.Outbound, + def tags = new DataStreamsTags("bus", DataStreamsTags.Direction.OUTBOUND, "exchange", "topic", "type", "subscription", "dataset_name", "dataset_namespace", true, "group", "consumer_group", true, "kafka_cluster_id", "partition") expect: @@ -105,12 +105,12 @@ class DataStreamsTagsTest extends Specification { def 'test create'() { setup: - def one = DataStreamsTags.create("type", DataStreamsTags.Direction.Outbound) - def two = DataStreamsTags.create("type", DataStreamsTags.Direction.Outbound, "topic") - def three = DataStreamsTags.create("type", DataStreamsTags.Direction.Outbound, "topic", "group", "cluster") + def one = DataStreamsTags.create("type", DataStreamsTags.Direction.OUTBOUND) + def two = DataStreamsTags.create("type", DataStreamsTags.Direction.OUTBOUND, "topic") + def three = DataStreamsTags.create("type", DataStreamsTags.Direction.OUTBOUND, "topic", "group", "cluster") def four = DataStreamsTags.createWithPartition("type", "topic", "partition", "cluster", "group") - def five = DataStreamsTags.createWithDataset("type", DataStreamsTags.Direction.Outbound, "topic", "dataset", "namespace") - def six = DataStreamsTags.createWithSubscription("type", DataStreamsTags.Direction.Inbound, "subscription") + def five = DataStreamsTags.createWithDataset("type", DataStreamsTags.Direction.OUTBOUND, "topic", "dataset", "namespace") + def six = DataStreamsTags.createWithSubscription("type", DataStreamsTags.Direction.INBOUND, "subscription") expect: one.hasAllTags("type:type", "direction:out") two.hasAllTags("type:type", "direction:out", "topic:topic")