From d8788b68ddd7275e1390992d508ff55c9f8e1d01 Mon Sep 17 00:00:00 2001 From: Anthony Mirabella Date: Wed, 1 Feb 2023 08:43:53 -0500 Subject: [PATCH] Flush meter provider at end of lambda function handler (#1613) * Flush meter provider at end of lambda function handler Signed-off-by: Anthony J Mirabella * Update `force_flush()` check based on PR feedback Signed-off-by: Anthony J Mirabella --------- Signed-off-by: Anthony J Mirabella --- CHANGELOG.md | 2 + .../instrumentation/aws_lambda/__init__.py | 43 +++++++++++++++---- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3830495430..e9120929bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553)) - `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available ([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212)) +- `opentelemetry-instrumentation-aws-lambda` Flush `MeterProvider` at end of function invocation. + ([#1613](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1613)) - Fix aiohttp bug with unset `trace_configs` ([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592)) ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py index 9d98e6806a..d696cdf3e8 100644 --- a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py @@ -45,6 +45,7 @@ def lambda_handler(event, context): The `instrument` method accepts the following keyword args: tracer_provider (TracerProvider) - an optional tracer provider +meter_provider (MeterProvider) - an optional meter provider event_context_extractor (Callable) - a function that returns an OTel Trace Context given the Lambda Event the AWS Lambda was invoked with this function signature is: def event_context_extractor(lambda_event: Any) -> Context @@ -68,6 +69,7 @@ def custom_event_context_extractor(lambda_event): import logging import os +import time from importlib import import_module from typing import Any, Callable, Collection from urllib.parse import urlencode @@ -79,6 +81,10 @@ def custom_event_context_extractor(lambda_event): from opentelemetry.instrumentation.aws_lambda.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.metrics import ( + MeterProvider, + get_meter_provider, +) from opentelemetry.propagate import get_global_textmap from opentelemetry.propagators.aws.aws_xray_propagator import ( TRACE_HEADER_KEY, @@ -274,6 +280,7 @@ def _instrument( event_context_extractor: Callable[[Any], Context], tracer_provider: TracerProvider = None, disable_aws_context_propagation: bool = False, + meter_provider: MeterProvider = None, ): def _instrumented_lambda_handler_call( call_wrapped, instance, args, kwargs @@ -352,15 +359,33 @@ def _instrumented_lambda_handler_call( result.get("statusCode"), ) + now = time.time() _tracer_provider = tracer_provider or get_tracer_provider() - try: - # NOTE: `force_flush` before function quit in case of Lambda freeze. - # Assumes we are using the OpenTelemetry SDK implementation of the - # `TracerProvider`. - _tracer_provider.force_flush(flush_timeout) - except Exception: # pylint: disable=broad-except - logger.error( - "TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation." + if hasattr(_tracer_provider, "force_flush"): + try: + # NOTE: `force_flush` before function quit in case of Lambda freeze. + _tracer_provider.force_flush(flush_timeout) + except Exception: # pylint: disable=broad-except + logger.exception( + f"TracerProvider failed to flush traces" + ) + else: + logger.warning("TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation.") + + _meter_provider = meter_provider or get_meter_provider() + if hasattr(_meter_provider, "force_flush"): + rem = flush_timeout - (time.time()-now)*1000 + if rem > 0: + try: + # NOTE: `force_flush` before function quit in case of Lambda freeze. + _meter_provider.force_flush(rem) + except Exception: # pylint: disable=broad-except + logger.exception( + f"MeterProvider failed to flush metrics" + ) + else: + logger.warning( + "MeterProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation." ) return result @@ -385,6 +410,7 @@ def _instrument(self, **kwargs): Args: **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global + ``meter_provider``: a MeterProvider, defaults to global ``event_context_extractor``: a method which takes the Lambda Event as input and extracts an OTel Context from it. By default, the context is extracted from the HTTP headers of an API Gateway @@ -432,6 +458,7 @@ def _instrument(self, **kwargs): ), tracer_provider=kwargs.get("tracer_provider"), disable_aws_context_propagation=disable_aws_context_propagation, + meter_provider=kwargs.get("meter_provider"), ) def _uninstrument(self, **kwargs):