Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add messaging conventions to sqs spans #9712

Merged
merged 3 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
}
}
span(1) {
name "SQS.ReceiveMessage"
name "s3ToSqsTestQueue receive"
kind CONSUMER
childOf span(0)
attributes {
Expand All @@ -210,6 +210,9 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
Expand Down Expand Up @@ -556,7 +559,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
}
trace(9, 1) {
span(0) {
name "SQS.ReceiveMessage"
name "s3ToSnsToSqsTestQueue receive"
kind CONSUMER
hasNoParent()
attributes {
Expand All @@ -574,6 +577,9 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSnsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
}
}
span(1) {
name "SQS.ReceiveMessage"
name "snsToSqsTestQueue receive"
kind CONSUMER
childOf span(0)
attributes {
Expand All @@ -194,6 +194,9 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "snsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@

package io.opentelemetry.instrumentation.awssdk.v1_11;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

import com.amazonaws.Request;
import com.amazonaws.Response;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor;
import java.util.Arrays;
import java.util.List;
Expand All @@ -25,7 +32,6 @@ final class AwsSdkInstrumenterFactory {
RpcClientAttributesExtractor.create(AwsSdkRpcAttributesGetter.INSTANCE);
private static final AwsSdkExperimentalAttributesExtractor experimentalAttributesExtractor =
new AwsSdkExperimentalAttributesExtractor();
private static final AwsSdkSpanKindExtractor spanKindExtractor = new AwsSdkSpanKindExtractor();

private static final List<AttributesExtractor<Request<?>, Response<?>>>
defaultAttributesExtractors = Arrays.asList(httpAttributesExtractor, rpcAttributesExtractor);
Expand All @@ -41,27 +47,55 @@ static Instrumenter<Request<?>, Response<?>> requestInstrumenter(
return createInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes,
AwsSdkInstrumenterFactory.spanKindExtractor);
spanName,
SpanKindExtractor.alwaysClient(),
emptyList());
}

static Instrumenter<Request<?>, Response<?>> consumerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
return sqsInstrumenter(
openTelemetry, MessageOperation.RECEIVE, captureExperimentalSpanAttributes);
}

static Instrumenter<Request<?>, Response<?>> producerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
return sqsInstrumenter(
openTelemetry, MessageOperation.PUBLISH, captureExperimentalSpanAttributes);
}

private static Instrumenter<Request<?>, Response<?>> sqsInstrumenter(
OpenTelemetry openTelemetry,
MessageOperation operation,
boolean captureExperimentalSpanAttributes) {
SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE;
AttributesExtractor<Request<?>, Response<?>> messagingAttributeExtractor =
MessagingAttributesExtractor.builder(getter, operation).build();

return createInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, SpanKindExtractor.alwaysConsumer());
openTelemetry,
captureExperimentalSpanAttributes,
MessagingSpanNameExtractor.create(getter, operation),
operation == MessageOperation.PUBLISH
? SpanKindExtractor.alwaysProducer()
: SpanKindExtractor.alwaysConsumer(),
singletonList(messagingAttributeExtractor));
}

private static Instrumenter<Request<?>, Response<?>> createInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
SpanKindExtractor<Request<?>> kindExtractor) {
SpanNameExtractor<Request<?>> spanNameExtractor,
SpanKindExtractor<Request<?>> spanKindExtractor,
List<AttributesExtractor<Request<?>, Response<?>>> additionalAttributeExtractors) {
return Instrumenter.<Request<?>, Response<?>>builder(
openTelemetry, INSTRUMENTATION_NAME, spanName)
openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractors(
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors)
.buildInstrumenter(kindExtractor);
.addAttributesExtractors(additionalAttributeExtractors)
.buildInstrumenter(spanKindExtractor);
}

private AwsSdkInstrumenterFactory() {}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) {

private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<Request<?>, Response<?>> consumerInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;

AwsSdkTelemetry(OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
requestInstrumenter =
Expand All @@ -54,13 +55,17 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) {
consumerInstrumenter =
AwsSdkInstrumenterFactory.consumerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
producerInstrumenter =
AwsSdkInstrumenterFactory.producerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
}

/**
* Returns a {@link RequestHandler2} for registration to AWS SDK client builders using {@code
* withRequestHandlers}.
*/
public RequestHandler2 newRequestHandler() {
return new TracingRequestHandler(requestInstrumenter, consumerInstrumenter);
return new TracingRequestHandler(
requestInstrumenter, consumerInstrumenter, producerInstrumenter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.amazonaws.Response;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import java.util.Collections;
import java.util.Map;

final class SqsAccess {
private SqsAccess() {}
Expand All @@ -28,4 +30,9 @@ static boolean afterResponse(
static boolean beforeMarshalling(AmazonWebServiceRequest request) {
return enabled && SqsImpl.beforeMarshalling(request);
}

@NoMuzzle
static Map<String, String> getMessageAttributes(Request<?> request) {
return enabled ? SqsImpl.getMessageAttributes(request) : Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.awssdk.v1_11;

import com.amazonaws.Request;
import com.amazonaws.Response;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;

enum SqsAttributesGetter implements MessagingAttributesGetter<Request<?>, Response<?>> {
INSTANCE;

@Override
public String getSystem(Request<?> request) {
return "AmazonSQS";
}

@Override
public String getDestination(Request<?> request) {
Object originalRequest = request.getOriginalRequest();
String queueUrl = RequestAccess.getQueueUrl(originalRequest);
int i = queueUrl.lastIndexOf('/');
return i > 0 ? queueUrl.substring(i + 1) : null;
}

@Override
public boolean isTemporaryDestination(Request<?> request) {
return false;
}

@Override
@Nullable
public String getConversationId(Request<?> request) {
return null;
}

@Override
@Nullable
public Long getMessagePayloadSize(Request<?> request) {
return null;
}

@Override
@Nullable
public Long getMessagePayloadCompressedSize(Request<?> request) {
return null;
}

@Override
@Nullable
public String getMessageId(Request<?> request, @Nullable Response<?> response) {
return null;
}

@Override
public List<String> getMessageHeader(Request<?> request, String name) {
String value = SqsAccess.getMessageAttributes(request).get(name);
return value != null ? Collections.singletonList(value) : Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@
import com.amazonaws.Response;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

final class SqsImpl {
static {
Expand Down Expand Up @@ -67,4 +72,19 @@ static boolean beforeMarshalling(AmazonWebServiceRequest rawRequest) {
}
return false;
}

static Map<String, String> getMessageAttributes(Request<?> request) {
if (request instanceof SendMessageRequest) {
Map<String, MessageAttributeValue> map =
((SendMessageRequest) request).getMessageAttributes();
if (!map.isEmpty()) {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, MessageAttributeValue> entry : map.entrySet()) {
result.put(entry.getKey(), entry.getValue().getStringValue());
}
return result;
}
}
return Collections.emptyMap();
}
}
Loading
Loading