From 107e89d30d69f2dc1fb6f25cba4732a0a9cc008f Mon Sep 17 00:00:00 2001 From: Anthony J Mirabella Date: Tue, 31 Jan 2023 02:34:35 -0500 Subject: [PATCH] Flush meter provider at end of lambda function handler Signed-off-by: Anthony J Mirabella --- CHANGELOG.md | 2 ++ .../instrumentation/aws_lambda/__init__.py | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b2c32ad4a8..b9abe7601d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553)) - `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available ([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212)) +- `opentelemetry-instrumentation-aws-lambda` Flush `MeterProvider` at end of function invocation. + ([#TBD](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/tbd)) ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py index 9d98e6806a..b361b641f0 100644 --- a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py @@ -45,6 +45,7 @@ def lambda_handler(event, context): The `instrument` method accepts the following keyword args: tracer_provider (TracerProvider) - an optional tracer provider +meter_provider (MeterProvider) - an optional meter provider event_context_extractor (Callable) - a function that returns an OTel Trace Context given the Lambda Event the AWS Lambda was invoked with this function signature is: def event_context_extractor(lambda_event: Any) -> Context @@ -68,6 +69,7 @@ def custom_event_context_extractor(lambda_event): import logging import os +import time from importlib import import_module from typing import Any, Callable, Collection from urllib.parse import urlencode @@ -79,6 +81,10 @@ def custom_event_context_extractor(lambda_event): from opentelemetry.instrumentation.aws_lambda.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.metrics import ( + MeterProvider, + get_meter_provider, +) from opentelemetry.propagate import get_global_textmap from opentelemetry.propagators.aws.aws_xray_propagator import ( TRACE_HEADER_KEY, @@ -274,6 +280,7 @@ def _instrument( event_context_extractor: Callable[[Any], Context], tracer_provider: TracerProvider = None, disable_aws_context_propagation: bool = False, + meter_provider: MeterProvider = None, ): def _instrumented_lambda_handler_call( call_wrapped, instance, args, kwargs @@ -352,6 +359,7 @@ def _instrumented_lambda_handler_call( result.get("statusCode"), ) + now = time.time() _tracer_provider = tracer_provider or get_tracer_provider() try: # NOTE: `force_flush` before function quit in case of Lambda freeze. @@ -363,6 +371,19 @@ def _instrumented_lambda_handler_call( "TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation." ) + rem = flush_timeout - (time.time()-now)*1000 + if rem > 0: + _meter_provider = meter_provider or get_meter_provider() + try: + # NOTE: `force_flush` before function quit in case of Lambda freeze. + # Assumes we are using the OpenTelemetry SDK implementation of the + # `MeterProvider`. + _meter_provider.force_flush(rem) + except Exception: # pylint: disable=broad-except + logger.error( + "MeterProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation." + ) + return result wrap_function_wrapper( @@ -385,6 +406,7 @@ def _instrument(self, **kwargs): Args: **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global + ``meter_provider``: a MeterProvider, defaults to global ``event_context_extractor``: a method which takes the Lambda Event as input and extracts an OTel Context from it. By default, the context is extracted from the HTTP headers of an API Gateway @@ -432,6 +454,7 @@ def _instrument(self, **kwargs): ), tracer_provider=kwargs.get("tracer_provider"), disable_aws_context_propagation=disable_aws_context_propagation, + meter_provider=kwargs.get("meter_provider"), ) def _uninstrument(self, **kwargs):