Skip to content
Merged
126 changes: 116 additions & 10 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,9 +685,32 @@ def create_inferred_span(
return None


def get_service_mapping(service_name):
service_mapping = os.getenv("DD_SERVICE_MAPPING", "")
mapping = {}

for entry in service_mapping.split(","):
parts = entry.split(":")
if len(parts) == 2:
key, value = parts
mapping[key.strip()] = value.strip()

return mapping.get(service_name)


def create_inferred_span_from_lambda_function_url_event(event, context):
request_context = event.get("requestContext")
api_id = request_context.get("apiId")
domain = request_context.get("domainName")
# Attempt to get the service mapping for the domain so that we don't remap all apigw services
# Allows the customer to have more fine-grained control
apiid_mapping = get_service_mapping(str(api_id))
# If the domain mapping is not found,
# attempt to get the service mapping for 'lambda_api_gateway'
lambda_url_mapping = None if apiid_mapping else get_service_mapping("lambda_url")
# If neither mapping is found, default to the domain name
service_name = apiid_mapping or lambda_url_mapping or domain

method = request_context.get("http", {}).get("method")
path = request_context.get("http", {}).get("path")
resource = "{0} {1}".format(method, path)
Expand All @@ -701,7 +724,7 @@ def create_inferred_span_from_lambda_function_url_event(event, context):
}
request_time_epoch = request_context.get("timeEpoch")
args = {
"service": domain,
"service": service_name,
"resource": resource,
"span_type": "http",
}
Expand Down Expand Up @@ -790,6 +813,17 @@ def create_inferred_span_from_api_gateway_websocket_event(
request_context = event.get("requestContext")
domain = request_context.get("domainName")
endpoint = request_context.get("routeKey")
api_id = request_context.get("apiId")
# Attempt to get the service mapping for the domain so that we don't remap all apigw services
# Allows the customer to have more fine-grained control
apiid_mapping = get_service_mapping(str(api_id))
# If the domain mapping is not found,
# attempt to get the service mapping for 'lambda_api_gateway_websocket'
api_gateway_mapping = (
None if apiid_mapping else get_service_mapping("lambda_api_gateway_websocket")
)
# If neither mapping is found, default to the domain name
service_name = apiid_mapping or api_gateway_mapping or domain
tags = {
"operation_name": "aws.apigateway.websocket",
"http.url": domain + endpoint,
Expand All @@ -809,7 +843,7 @@ def create_inferred_span_from_api_gateway_websocket_event(
else:
InferredSpanInfo.set_tags(tags, tag_source="self", synchronicity="sync")
args = {
"service": domain,
"service": service_name,
"resource": endpoint,
"span_type": "web",
}
Expand Down Expand Up @@ -838,6 +872,18 @@ def create_inferred_span_from_api_gateway_event(
):
request_context = event.get("requestContext")
domain = request_context.get("domainName", "")
api_id = request_context.get("apiId")
# Attempt to get the service mapping for the domain so that we don't remap all apigw services
# Allows the customer to have more fine-grained control
apiid_mapping = get_service_mapping(str(api_id))
# If the domain mapping is not found,
# attempt to get the service mapping for 'lambda_api_gateway'
api_gateway_mapping = (
None if apiid_mapping else get_service_mapping("lambda_api_gateway")
)
# If neither mapping is found, default to the domain name
service_name = apiid_mapping or api_gateway_mapping or domain

method = event.get("httpMethod")
path = event.get("path")
resource = "{0} {1}".format(method, path)
Expand All @@ -858,7 +904,7 @@ def create_inferred_span_from_api_gateway_event(
else:
InferredSpanInfo.set_tags(tags, tag_source="self", synchronicity="sync")
args = {
"service": domain,
"service": service_name,
"resource": resource,
"span_type": "http",
}
Expand Down Expand Up @@ -888,6 +934,17 @@ def create_inferred_span_from_http_api_event(
):
request_context = event.get("requestContext")
domain = request_context.get("domainName")
api_id = request_context.get("apiId")
# Attempt to get the service mapping for the domain so that we don't remap all apigw services
# Allows the customer to have more fine-grained control
apiid_mapping = get_service_mapping(str(api_id))
# If the domain mapping is not found,
# attempt to get the service mapping for 'api_gateway_mapping'
api_gateway_mapping = (
None if apiid_mapping else get_service_mapping("lambda_http_api")
)
# If neither mapping is found, default to the domain name
service_name = apiid_mapping or api_gateway_mapping or domain
method = request_context.get("http", {}).get("method")
path = event.get("rawPath")
resource = "{0} {1}".format(method, path)
Expand All @@ -911,7 +968,7 @@ def create_inferred_span_from_http_api_event(
else:
InferredSpanInfo.set_tags(tags, tag_source="self", synchronicity="sync")
args = {
"service": domain,
"service": service_name,
"resource": resource,
"span_type": "http",
}
Expand All @@ -936,6 +993,14 @@ def create_inferred_span_from_sqs_event(event, context):
event_record = get_first_record(event)
event_source_arn = event_record.get("eventSourceARN")
queue_name = event_source_arn.split(":")[-1]
queue_mapping = get_service_mapping(str(queue_name))

# If the domain mapping is not found, attempt to get the service mapping for 'sqs'
sqs_mapping = None if queue_mapping else get_service_mapping("lambda_sqs")

# If neither mapping is found, default to the sns name
service_name = queue_mapping or sqs_mapping or "sqs"

tags = {
"operation_name": "aws.sqs",
"resource_names": queue_name,
Expand All @@ -947,7 +1012,7 @@ def create_inferred_span_from_sqs_event(event, context):
InferredSpanInfo.set_tags(tags, tag_source="self", synchronicity="async")
request_time_epoch = event_record.get("attributes", {}).get("SentTimestamp")
args = {
"service": "sqs",
"service": service_name,
"resource": queue_name,
"span_type": "web",
}
Expand Down Expand Up @@ -990,6 +1055,13 @@ def create_inferred_span_from_sns_event(event, context):
sns_message = event_record.get("Sns")
topic_arn = event_record.get("Sns", {}).get("TopicArn")
topic_name = topic_arn.split(":")[-1]
queue_mapping = get_service_mapping(str(topic_name))

# If the domain mapping is not found, attempt to get the service mapping for 'sns'
sns_mapping = None if queue_mapping else get_service_mapping("lambda_sns")

# If neither mapping is found, default to the sns name
service_name = queue_mapping or sns_mapping or "sns"
tags = {
"operation_name": "aws.sns",
"resource_names": topic_name,
Expand All @@ -1009,7 +1081,7 @@ def create_inferred_span_from_sns_event(event, context):
dt = datetime.strptime(timestamp, sns_dt_format)

args = {
"service": "sns",
"service": service_name,
"resource": topic_name,
"span_type": "web",
}
Expand All @@ -1027,6 +1099,13 @@ def create_inferred_span_from_kinesis_event(event, context):
event_id = event_record.get("eventID")
stream_name = event_source_arn.split(":")[-1]
shard_id = event_id.split(":")[0]
stream_name_mapping = get_service_mapping(str(stream_name))
kinesis_mapping = (
None if stream_name_mapping else get_service_mapping("lambda_kinesis")
)

# If neither mapping is found, default to the kinesis name
service_name = stream_name_mapping or kinesis_mapping or "kinesis"
tags = {
"operation_name": "aws.kinesis",
"resource_names": stream_name,
Expand All @@ -1044,7 +1123,7 @@ def create_inferred_span_from_kinesis_event(event, context):
)

args = {
"service": "kinesis",
"service": service_name,
"resource": stream_name,
"span_type": "web",
}
Expand All @@ -1060,6 +1139,18 @@ def create_inferred_span_from_dynamodb_event(event, context):
event_record = get_first_record(event)
event_source_arn = event_record.get("eventSourceARN")
table_name = event_source_arn.split("/")[1]
# Attempt to get the service mapping for the domain so that we don't remap all apigw services
# Allows the customer to have more fine grained control
table_mapping = get_service_mapping(str(table_name))

# If the domain mapping is not found, attempt to get the service mapping for 'dynamodb'
dynamo_db_mapping = (
None if table_mapping else get_service_mapping("lambda_dynamodb")
)

# If neither mapping is found, default to "dynamodb"
service_name = table_mapping or dynamo_db_mapping or "dynamodb"

dynamodb_message = event_record.get("dynamodb")
tags = {
"operation_name": "aws.dynamodb",
Expand All @@ -1077,7 +1168,7 @@ def create_inferred_span_from_dynamodb_event(event, context):
"ApproximateCreationDateTime"
)
args = {
"service": "dynamodb",
"service": service_name,
"resource": table_name,
"span_type": "web",
}
Expand All @@ -1093,6 +1184,13 @@ def create_inferred_span_from_dynamodb_event(event, context):
def create_inferred_span_from_s3_event(event, context):
event_record = get_first_record(event)
bucket_name = event_record.get("s3", {}).get("bucket", {}).get("name")
bucket_name_mapping = get_service_mapping(str(bucket_name))
# If the domain mapping is not found, attempt to get the service mapping for 's3'
s3_mapping = None if bucket_name_mapping else get_service_mapping("lambda_s3")

# If neither mapping is found, default to the s3 name
service_name = bucket_name_mapping or s3_mapping or "s3"

tags = {
"operation_name": "aws.s3",
"resource_names": bucket_name,
Expand All @@ -1109,7 +1207,7 @@ def create_inferred_span_from_s3_event(event, context):
dt = datetime.strptime(timestamp, dt_format)

args = {
"service": "s3",
"service": service_name,
"resource": bucket_name,
"span_type": "web",
}
Expand All @@ -1123,6 +1221,14 @@ def create_inferred_span_from_s3_event(event, context):

def create_inferred_span_from_eventbridge_event(event, context):
source = event.get("source")
source_mapping = get_service_mapping(str(source))
# If the domain mapping is not found, attempt to get the service mapping for 'eventbridge'
event_bridge_mapping = (
None if source_mapping else get_service_mapping("lambda_eventbridge")
)

# If neither mapping is found, default to the eventbridge name
service_name = source_mapping or event_bridge_mapping or "eventbridge"
tags = {
"operation_name": "aws.eventbridge",
"resource_names": source,
Expand All @@ -1138,7 +1244,7 @@ def create_inferred_span_from_eventbridge_event(event, context):
dt = datetime.strptime(timestamp, dt_format)

args = {
"service": "eventbridge",
"service": service_name,
"resource": source,
"span_type": "web",
}
Expand Down
Loading