Skip to content

Commit

Permalink
Flush meter provider at end of lambda function handler (#1613)
Browse files Browse the repository at this point in the history
* Flush meter provider at end of lambda function handler

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>

* Update `force_flush()` check based on PR feedback

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>

---------

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>
  • Loading branch information
Aneurysm9 authored Feb 1, 2023
1 parent 6ed2c56 commit d8788b6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553))
- `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available
([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212))
- `opentelemetry-instrumentation-aws-lambda` Flush `MeterProvider` at end of function invocation.
([#1613](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1613))
- Fix aiohttp bug with unset `trace_configs` ([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592))

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def lambda_handler(event, context):
The `instrument` method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
meter_provider (MeterProvider) - an optional meter provider
event_context_extractor (Callable) - a function that returns an OTel Trace
Context given the Lambda Event the AWS Lambda was invoked with
this function signature is: def event_context_extractor(lambda_event: Any) -> Context
Expand All @@ -68,6 +69,7 @@ def custom_event_context_extractor(lambda_event):

import logging
import os
import time
from importlib import import_module
from typing import Any, Callable, Collection
from urllib.parse import urlencode
Expand All @@ -79,6 +81,10 @@ def custom_event_context_extractor(lambda_event):
from opentelemetry.instrumentation.aws_lambda.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import (
MeterProvider,
get_meter_provider,
)
from opentelemetry.propagate import get_global_textmap
from opentelemetry.propagators.aws.aws_xray_propagator import (
TRACE_HEADER_KEY,
Expand Down Expand Up @@ -274,6 +280,7 @@ def _instrument(
event_context_extractor: Callable[[Any], Context],
tracer_provider: TracerProvider = None,
disable_aws_context_propagation: bool = False,
meter_provider: MeterProvider = None,
):
def _instrumented_lambda_handler_call(
call_wrapped, instance, args, kwargs
Expand Down Expand Up @@ -352,15 +359,33 @@ def _instrumented_lambda_handler_call(
result.get("statusCode"),
)

now = time.time()
_tracer_provider = tracer_provider or get_tracer_provider()
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
# Assumes we are using the OpenTelemetry SDK implementation of the
# `TracerProvider`.
_tracer_provider.force_flush(flush_timeout)
except Exception: # pylint: disable=broad-except
logger.error(
"TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation."
if hasattr(_tracer_provider, "force_flush"):
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
_tracer_provider.force_flush(flush_timeout)
except Exception: # pylint: disable=broad-except
logger.exception(
f"TracerProvider failed to flush traces"
)
else:
logger.warning("TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation.")

_meter_provider = meter_provider or get_meter_provider()
if hasattr(_meter_provider, "force_flush"):
rem = flush_timeout - (time.time()-now)*1000
if rem > 0:
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
_meter_provider.force_flush(rem)
except Exception: # pylint: disable=broad-except
logger.exception(
f"MeterProvider failed to flush metrics"
)
else:
logger.warning(
"MeterProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation."
)

return result
Expand All @@ -385,6 +410,7 @@ def _instrument(self, **kwargs):
Args:
**kwargs: Optional arguments
``tracer_provider``: a TracerProvider, defaults to global
``meter_provider``: a MeterProvider, defaults to global
``event_context_extractor``: a method which takes the Lambda
Event as input and extracts an OTel Context from it. By default,
the context is extracted from the HTTP headers of an API Gateway
Expand Down Expand Up @@ -432,6 +458,7 @@ def _instrument(self, **kwargs):
),
tracer_provider=kwargs.get("tracer_provider"),
disable_aws_context_propagation=disable_aws_context_propagation,
meter_provider=kwargs.get("meter_provider"),
)

def _uninstrument(self, **kwargs):
Expand Down

0 comments on commit d8788b6

Please sign in to comment.