Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache Camel - S3 to SQS propagation impl + tests #2583

Merged
merged 4 commits into from Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ private static Map<String, SpanDecorator> loadDecorators() {
Map<String, SpanDecorator> result = new HashMap<>();
result.put("ahc", new HttpSpanDecorator());
result.put("ampq", new MessagingSpanDecorator("ampq"));
result.put("aws-s3", new S3SpanDecorator());
result.put("aws-sns", new MessagingSpanDecorator("aws-sns"));
result.put("aws-sqs", new MessagingSpanDecorator("aws-sqs"));
result.put("cometd", new MessagingSpanDecorator("cometd"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.apachecamel.decorators;

import io.opentelemetry.api.trace.SpanKind;

public class S3SpanDecorator extends BaseSpanDecorator {

@Override
public SpanKind getInitiatorSpanKind() {
return SpanKind.INTERNAL;
}

@Override
public SpanKind getReceiverSpanKind() {
return SpanKind.INTERNAL;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.apachecamel

import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.BucketNotificationConfiguration
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.QueueConfiguration
import com.amazonaws.services.s3.model.S3Event
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.SetBucketNotificationConfigurationRequest
import com.amazonaws.services.s3.model.TopicConfiguration
import com.amazonaws.services.sns.AmazonSNSAsyncClient
import com.amazonaws.services.sns.model.CreateTopicResult
import com.amazonaws.services.sns.model.SetTopicAttributesRequest
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest
import com.amazonaws.services.sqs.model.PurgeQueueRequest
import com.amazonaws.services.sqs.model.ReceiveMessageRequest

class AwsConnector {


private AmazonSQSAsyncClient sqsClient
private AmazonS3Client s3Client
private AmazonSNSAsyncClient snsClient

static liveAws() {
AwsConnector awsConnector = new AwsConnector()

awsConnector.sqsClient = AmazonSQSAsyncClient.asyncBuilder()
.build()

awsConnector.s3Client = AmazonS3Client.builder()
.build()

awsConnector.snsClient = AmazonSNSAsyncClient.asyncBuilder()
.build()

return awsConnector
}

def createQueue(String queueName) {
println "Create queue ${queueName}"
return sqsClient.createQueue(queueName).getQueueUrl()
}

def getQueueArn(String queueUrl) {
println "Get ARN for queue ${queueUrl}"
return sqsClient.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("QueueArn")).getAttributes()
.get("QueueArn")
}

def setTopicPublishingPolicy(String topicArn) {
println "Set policy for topic ${topicArn}"
snsClient.setTopicAttributes(new SetTopicAttributesRequest(topicArn, "Policy", String.format(SNS_POLICY, topicArn)))
}

private static final String SNS_POLICY = "{" +
" \"Statement\": [" +
" {" +
" \"Effect\": \"Allow\"," +
" \"Principal\": \"*\"," +
" \"Action\": \"sns:Publish\"," +
" \"Resource\": \"%s\"" +
" }]" +
"}"

def setQueuePublishingPolicy(String queueUrl, String queueArn) {
println "Set policy for queue ${queueArn}"
sqsClient.setQueueAttributes(queueUrl, Collections.singletonMap("Policy", String.format(SQS_POLICY, queueArn)))
}

private static final String SQS_POLICY = "{" +
" \"Statement\": [" +
" {" +
" \"Effect\": \"Allow\"," +
" \"Principal\": \"*\"," +
" \"Action\": \"sqs:SendMessage\"," +
" \"Resource\": \"%s\"" +
" }]" +
"}"

def createBucket(String bucketName) {
println "Create bucket ${bucketName}"
s3Client.createBucket(bucketName)
}

def deleteBucket(String bucketName) {
println "Delete bucket ${bucketName}"
ObjectListing objectListing = s3Client.listObjects(bucketName)
Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator()
while (objIter.hasNext()) {
s3Client.deleteObject(bucketName, objIter.next().getKey())
}
s3Client.deleteBucket(bucketName)
}

def enableS3ToSqsNotifications(String bucketName, String sqsQueueArn) {
println "Enable notification for bucket ${bucketName} to queue ${sqsQueueArn}"
BucketNotificationConfiguration notificationConfiguration = new BucketNotificationConfiguration()
notificationConfiguration.addConfiguration("sqsQueueConfig",
new QueueConfiguration(sqsQueueArn, EnumSet.of(S3Event.ObjectCreatedByPut)))
s3Client.setBucketNotificationConfiguration(new SetBucketNotificationConfigurationRequest(
bucketName, notificationConfiguration))
}

def enableS3ToSnsNotifications(String bucketName, String snsTopicArn) {
println "Enable notification for bucket ${bucketName} to topic ${snsTopicArn}"
BucketNotificationConfiguration notificationConfiguration = new BucketNotificationConfiguration()
notificationConfiguration.addConfiguration("snsTopicConfig",
new TopicConfiguration(snsTopicArn, EnumSet.of(S3Event.ObjectCreatedByPut)))
s3Client.setBucketNotificationConfiguration(new SetBucketNotificationConfigurationRequest(
bucketName, notificationConfiguration))
}

def createTopicAndSubscribeQueue(String topicName, String queueArn) {
println "Create topic ${topicName} and subscribe to queue ${queueArn}"
CreateTopicResult ctr = snsClient.createTopic(topicName)
snsClient.subscribe(ctr.getTopicArn(), "sqs", queueArn)
return ctr.getTopicArn()
}

def receiveMessage(String queueUrl) {
println "Receive message from queue ${queueUrl}"
return sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20))
}

def purgeQueue(String queueUrl) {
println "Purge queue ${queueUrl}"
sqsClient.purgeQueue(new PurgeQueueRequest(queueUrl))
}

def putSampleData(String bucketName) {
println "Put sample data to bucket ${bucketName}"
s3Client.putObject(bucketName, "otelTestKey", "otelTestData")
}

def publishSampleNotification(String topicArn) {
snsClient.publish(topicArn, "Hello There")
}
}
Loading