Skip to content

Commit

Permalink
feat: add eventbridge integration (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Apr 15, 2024
1 parent 69f8f92 commit 1cf47e1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,31 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
except Exception as ex:
pass

eventBridgeTriggerSpan = None
try:
if type(lambda_event) is dict and lambda_event.get("source") is not None and type(lambda_event.get("source")) is str:
span_name = 'EventBridge event'
if lambda_event.get("detail-type") is not None:
span_name = lambda_event.get("detail-type")

links = []
if lambda_event.get("detail") is not None and lambda_event["detail"].get("_context") is not None:
ctx = get_global_textmap().extract(carrier=lambda_event["detail"].get("_context"))
links.append(Link(get_current_span(ctx).get_span_context()))

eventBridgeTriggerSpan = tracer.start_span(span_name, context=parent_context, kind=SpanKind.CONSUMER, links=links)
eventBridgeTriggerSpan.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub")
eventBridgeTriggerSpan.set_attribute("faas.trigger.type", "EventBridge")
eventBridgeTriggerSpan.set_attribute("aws.event.bridge.trigger.source", lambda_event.get("source"))
parent_context = set_span_in_context(eventBridgeTriggerSpan)

eventBridgeTriggerSpan.set_attribute(
"rpc.request.body",
json.dumps(lambda_event),
)
except Exception as ex:
pass

try:
with tracer.start_as_current_span(
name=orig_handler_name,
Expand Down Expand Up @@ -685,6 +710,22 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
except Exception:
pass
cognitoTriggerSpan.end()

if lambda_event and eventBridgeTriggerSpan is not None:
try:
if isinstance(result, dict) and result.get("statusCode"):
eventBridgeTriggerSpan.set_attribute(
SpanAttributes.HTTP_STATUS_CODE,
result.get("statusCode"),
)
if isinstance(result, dict) and result.get("body"):
eventBridgeTriggerSpan.set_attribute(
"rpc.response.body",
result.get("body"),
)
except Exception:
pass
eventBridgeTriggerSpan.end()

now = time.time()
_tracer_provider = tracer_provider or get_tracer_provider()
Expand Down Expand Up @@ -712,6 +753,8 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
dynamoTriggerSpan.end()
if cognitoTriggerSpan is not None:
cognitoTriggerSpan.end()
if eventBridgeTriggerSpan is not None:
eventBridgeTriggerSpan.end()

now = time.time()
_tracer_provider = tracer_provider or get_tracer_provider()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def response_hook(span, service_name, operation_name, result):
from opentelemetry.propagate import inject
from opentelemetry.propagators import textmap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import get_tracer
from opentelemetry.trace import get_tracer, SpanKind
from opentelemetry.trace.span import Span
import base64
import typing
Expand Down Expand Up @@ -232,6 +232,8 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
body = call_context.params.get("Message")
if body is not None:
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit,json.dumps(body, default=str))
elif call_context.service == "events" and call_context.operation == "PutEvents":
call_context.span_kind = SpanKind.PRODUCER
else:
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(call_context.params, default=str))
except Exception as ex:
Expand Down Expand Up @@ -288,7 +290,23 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
inject(carrier = entry.get("MessageAttributes"), setter=SQSSetter())

except Exception as ex:
pass
pass

try:
if call_context.service == "events" and call_context.operation == "PutEvents":
if args[1].get("Entries") is not None:
for entry in args[1].get("Entries"):
if entry.get("Detail") is not None:
detailJson = json.loads(entry.get("Detail"))
detailJson['_context'] = {}
inject(carrier = detailJson['_context'])
entry['Detail'] = json.dumps(detailJson)
else:
detailJson = {'_context': {}}
inject(carrier = detailJson['_context'])
entry['Detail'] = json.dumps(detailJson)
except Exception as ex:
pass

result = None
try:
Expand Down

0 comments on commit 1cf47e1

Please sign in to comment.