Skip to content

Commit

Permalink
Add process spans to aws-1 sqs instrumentation (open-telemetry#9796)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored and Abhishekkr3003 committed Nov 7, 2023
1 parent ae750d3 commit 3332095
Show file tree
Hide file tree
Showing 27 changed files with 1,088 additions and 112 deletions.
23 changes: 22 additions & 1 deletion instrumentation/aws-sdk/aws-sdk-1.11/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ testing {

implementation("com.amazonaws:aws-java-sdk-sqs:1.11.106")
}

targets {
all {
testTask.configure {
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}
}
}
}

val testSqsNoReceiveTelemetry by registering(JvmTestSuite::class) {
dependencies {
implementation(project(":instrumentation:aws-sdk:aws-sdk-1.11:testing"))

implementation("com.amazonaws:aws-java-sdk-sqs:1.11.106")
}
}
}
}
Expand All @@ -105,14 +121,19 @@ tasks {
check {
dependsOn(testing.suites)
}
} else {
check {
dependsOn(testing.suites.named("testSqs"), testing.suites.named("testSqsNoReceiveTelemetry"))
}
}

test {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
}

withType<Test>().configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.aws-sdk.experimental-span-attributes=true")
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.awssdk.v1_11.AwsSdkTelemetry;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;

/**
Expand All @@ -37,6 +38,8 @@ public class TracingRequestHandler extends RequestHandler2 {
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false))
.setMessagingReceiveInstrumentationEnabled(
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.build()
.newRequestHandler();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ class S3TracingTest extends AgentInstrumentationSpecification {
awsConnector.receiveMessage(queueUrl)
awsConnector.putSampleData(bucketName)
// traced message
awsConnector.receiveMessage(queueUrl)
def receiveMessageResult = awsConnector.receiveMessage(queueUrl)
receiveMessageResult.messages.each {message ->
runWithSpan("process child") {}
}

// cleanup
awsConnector.deleteBucket(bucketName)
awsConnector.purgeQueue(queueUrl)
Expand Down Expand Up @@ -168,7 +172,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
}
}
}
trace(5, 2) {
trace(5, 3) {
span(0) {
name "S3.PutObject"
kind CLIENT
Expand All @@ -192,7 +196,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
}
}
span(1) {
name "s3ToSqsTestQueue receive"
name "s3ToSqsTestQueue process"
kind CONSUMER
childOf span(0)
attributes {
Expand All @@ -203,17 +207,18 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.status_code" 200
"http.url" String
"net.peer.name" String
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.MESSAGING_OPERATION" "process"
}
}
span(2) {
name "process child"
childOf span(1)
attributes {
}
}
}
Expand Down Expand Up @@ -336,7 +341,10 @@ class S3TracingTest extends AgentInstrumentationSpecification {
awsConnector.receiveMessage(queueUrl)
awsConnector.putSampleData(bucketName)
// traced message
awsConnector.receiveMessage(queueUrl)
def receiveMessageResult = awsConnector.receiveMessage(queueUrl)
receiveMessageResult.messages.each {message ->
runWithSpan("process child") {}
}
// cleanup
awsConnector.deleteBucket(bucketName)
awsConnector.purgeQueue(queueUrl)
Expand Down Expand Up @@ -556,9 +564,9 @@ class S3TracingTest extends AgentInstrumentationSpecification {
}
}
}
trace(9, 1) {
trace(9, 2) {
span(0) {
name "s3ToSnsToSqsTestQueue receive"
name "s3ToSnsToSqsTestQueue process"
kind CONSUMER
hasNoParent()
attributes {
Expand All @@ -569,19 +577,22 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.status_code" 200
"http.url" String
"net.peer.name" String
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSnsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
span(1) {
name "process child"
childOf span(0)
attributes {
}
}
}
trace(10, 1) {
span(0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ class SnsTracingTest extends AgentInstrumentationSpecification {

when:
awsConnector.publishSampleNotification(topicArn)
awsConnector.receiveMessage(queueUrl)
def receiveMessageResult = awsConnector.receiveMessage(queueUrl)
receiveMessageResult.messages.each {message ->
runWithSpan("process child") {}
}

then:
assertTraces(6) {
Expand Down Expand Up @@ -154,7 +157,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
}
}
}
trace(5, 2) {
trace(5, 3) {
span(0) {
name "SNS.Publish"
kind CLIENT
Expand All @@ -176,7 +179,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
}
}
span(1) {
name "snsToSqsTestQueue receive"
name "snsToSqsTestQueue process"
kind CONSUMER
childOf span(0)
attributes {
Expand All @@ -187,16 +190,18 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
"rpc.service" "AmazonSQS"
"rpc.method" "ReceiveMessage"
"http.method" "POST"
"http.status_code" 200
"http.url" String
"net.peer.name" String
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "snsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
"$SemanticAttributes.MESSAGING_OPERATION" "process"
}
}
span(2) {
name "process child"
childOf span(1)
attributes {
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11

import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import io.opentelemetry.instrumentation.awssdk.v1_11.AbstractSqsSuppressReceiveSpansTest
import io.opentelemetry.instrumentation.test.AgentTestTrait

class SqsSuppressReceiveSpansTest extends AbstractSqsSuppressReceiveSpansTest implements AgentTestTrait {
@Override
AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
return client
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@ dependencies {
testLibrary("com.amazonaws:aws-java-sdk-sqs:1.11.106")
}

tasks.test {
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", "true")
tasks {
withType<Test>().configureEach {
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", "true")
}

val testReceiveSpansDisabled by registering(Test::class) {
filter {
includeTestsMatching("SqsSuppressReceiveSpansTest")
}
include("**/SqsSuppressReceiveSpansTest.*")
}

test {
filter {
excludeTestsMatching("SqsSuppressReceiveSpansTest")
}
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}

check {
dependsOn(testReceiveSpansDisabled)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public class TracingRequestHandler extends RequestHandler2 {
.setCaptureExperimentalSpanAttributes(
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-span-attributes", false))
.setMessagingReceiveInstrumentationEnabled(
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
.build()
.newRequestHandler();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.awssdk.v1_11.instrumentor

import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import io.opentelemetry.instrumentation.awssdk.v1_11.AbstractSqsSuppressReceiveSpansTest
import io.opentelemetry.instrumentation.test.LibraryTestTrait

class SqsSuppressReceiveSpansTest extends AbstractSqsSuppressReceiveSpansTest implements LibraryTestTrait {
@Override
AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
return client
}
}
Loading

0 comments on commit 3332095

Please sign in to comment.