Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,31 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
except Exception as ex:
pass

eventBridgeTriggerSpan = None
try:
if type(lambda_event) is dict and lambda_event.get("source") is not None and type(lambda_event.get("source")) is str:
span_name = 'EventBridge event'
if lambda_event.get("detail-type") is not None:
span_name = lambda_event.get("detail-type")

links = []
if lambda_event.get("detail") is not None and lambda_event["detail"].get("_context") is not None:
ctx = get_global_textmap().extract(carrier=lambda_event["detail"].get("_context"))
links.append(Link(get_current_span(ctx).get_span_context()))

eventBridgeTriggerSpan = tracer.start_span(span_name, context=parent_context, kind=SpanKind.CONSUMER, links=links)
eventBridgeTriggerSpan.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub")
eventBridgeTriggerSpan.set_attribute("faas.trigger.type", "EventBridge")
eventBridgeTriggerSpan.set_attribute("aws.event.bridge.trigger.source", lambda_event.get("source"))
parent_context = set_span_in_context(eventBridgeTriggerSpan)

eventBridgeTriggerSpan.set_attribute(
"rpc.request.body",
json.dumps(lambda_event),
)
except Exception as ex:
pass

try:
with tracer.start_as_current_span(
name=orig_handler_name,
Expand Down Expand Up @@ -651,6 +676,22 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
except Exception:
pass
cognitoTriggerSpan.end()

if lambda_event and eventBridgeTriggerSpan is not None:
try:
if isinstance(result, dict) and result.get("statusCode"):
eventBridgeTriggerSpan.set_attribute(
SpanAttributes.HTTP_STATUS_CODE,
result.get("statusCode"),
)
if isinstance(result, dict) and result.get("body"):
eventBridgeTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
)
except Exception:
pass
eventBridgeTriggerSpan.end()

now = time.time()
_tracer_provider = tracer_provider or get_tracer_provider()
Expand Down Expand Up @@ -678,6 +719,8 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
dynamoTriggerSpan.end()
if cognitoTriggerSpan is not None:
cognitoTriggerSpan.end()
if eventBridgeTriggerSpan is not None:
eventBridgeTriggerSpan.end()

now = time.time()
_tracer_provider = tracer_provider or get_tracer_provider()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def response_hook(span, service_name, operation_name, result):
from opentelemetry.propagate import inject
from opentelemetry.propagators import textmap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import get_tracer
from opentelemetry.trace import get_tracer, SpanKind
from opentelemetry.trace.span import Span
import base64
import typing
Expand Down Expand Up @@ -218,6 +218,8 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
body = call_context.params.get("Message")
if body is not None:
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit,json.dumps(body, default=str))
elif call_context.service == "events" and call_context.operation == "PutEvents":
call_context.span_kind = SpanKind.PRODUCER
else:
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(call_context.params, default=str))
except Exception as ex:
Expand Down Expand Up @@ -274,7 +276,23 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
inject(carrier = entry.get("MessageAttributes"), setter=SQSSetter())

except Exception as ex:
pass
pass

try:
if call_context.service == "events" and call_context.operation == "PutEvents":
if args[1].get("Entries") is not None:
for entry in args[1].get("Entries"):
if entry.get("Detail") is not None:
detailJson = json.loads(entry.get("Detail"))
detailJson['_context'] = {}
inject(carrier = detailJson['_context'])
entry['Detail'] = json.dumps(detailJson)
else:
detailJson = {'_context': {}}
inject(carrier = detailJson['_context'])
entry['Detail'] = json.dumps(detailJson)
except Exception as ex:
pass

result = None
try:
Expand Down