Skip to content

Statsd integration #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datadog_lambda/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# The minor version corresponds to the Lambda layer version.
# E.g.,, version 0.5.0 gets packaged into layer version 5.
__version__ = "2.21.0"
__version__ = "2.22.0"


import os
Expand Down
29 changes: 29 additions & 0 deletions datadog_lambda/extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging
import requests

AGENT_URL = "http://127.0.0.1:8124"
HELLO_PATH = "/lambda/hello"
FLUSH_PATH = "/lambda/flush"

logger = logging.getLogger(__name__)


def is_extension_running():
try:
requests.get(AGENT_URL + HELLO_PATH)
except Exception as e:
logger.debug("Extension is not running, returned with error %s", e)
return False
return True


def flush_extension():
try:
requests.post(AGENT_URL + FLUSH_PATH, data={})
except Exception as e:
logger.debug("Failed to flush extension, returned with error %s", e)
return False
return True


should_use_extension = is_extension_running()
49 changes: 35 additions & 14 deletions datadog_lambda/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,42 @@
import logging

import boto3
from datadog import api
from datadog import api, initialize, statsd
from datadog.threadstats import ThreadStats
from datadog_lambda.extension import should_use_extension
from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer


ENHANCED_METRICS_NAMESPACE_PREFIX = "aws.lambda.enhanced"

logger = logging.getLogger(__name__)

lambda_stats = ThreadStats()
lambda_stats.start()

class StatsDWrapper:
"""
Wraps StatsD calls, to give an identical interface to ThreadStats
"""

def __init__(self):
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125}
initialize(**options)

def distribution(self, metric_name, value, tags=[], timestamp=None):
statsd.distribution(metric_name, value, tags=tags)

def flush(self, value):
pass


def lambda_metric(metric_name, value, timestamp=None, tags=None):
lambda_stats = None
if should_use_extension:
lambda_stats = StatsDWrapper()
else:
lambda_stats = ThreadStats()
lambda_stats.start()


def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=False):
"""
Submit a data point to Datadog distribution metrics.
https://docs.datadoghq.com/graphing/metrics/distributions/
Expand All @@ -36,12 +58,14 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None):
periodically and at the end of the function execution in a
background thread.
"""
flush_to_logs = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true"
tags = tag_dd_lambda_layer(tags)
if os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true":
write_metric_point_to_stdout(metric_name, value, timestamp, tags)

if flush_to_logs or (force_async and not should_use_extension):
write_metric_point_to_stdout(metric_name, value, timestamp=timestamp, tags=tags)
else:
logger.debug("Sending metric %s to Datadog via lambda layer", metric_name)
lambda_stats.distribution(metric_name, value, timestamp=timestamp, tags=tags)
lambda_stats.distribution(metric_name, value, tags=tags, timestamp=timestamp)


def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
Expand Down Expand Up @@ -85,13 +109,10 @@ def submit_enhanced_metric(metric_name, lambda_context):
metric_name,
)
return

# Enhanced metrics are always written to logs
write_metric_point_to_stdout(
"{}.{}".format(ENHANCED_METRICS_NAMESPACE_PREFIX, metric_name),
1,
tags=get_enhanced_metrics_tags(lambda_context),
)
tags = get_enhanced_metrics_tags(lambda_context)
metric_name = "aws.lambda.enhanced." + metric_name
# Enhanced metrics always use an async submission method, (eg logs or extension).
lambda_metric(metric_name, 1, timestamp=None, tags=tags, force_async=True)


def submit_invocations_metric(lambda_context):
Expand Down
1 change: 0 additions & 1 deletion datadog_lambda/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def get_enhanced_metrics_tags(lambda_context):
get_cold_start_tag(),
"memorysize:{}".format(lambda_context.memory_limit_in_mb),
get_runtime_tag(),
_format_dd_lambda_layer_tag(),
]


Expand Down
6 changes: 5 additions & 1 deletion datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import traceback

from datadog_lambda.extension import should_use_extension, flush_extension
from datadog_lambda.cold_start import set_cold_start, is_cold_start
from datadog_lambda.metric import (
lambda_stats,
Expand Down Expand Up @@ -137,8 +138,11 @@ def _before(self, event, context):

def _after(self, event, context):
try:
if not self.flush_to_log:
if not self.flush_to_log or should_use_extension:
lambda_stats.flush(float("inf"))
if should_use_extension:
flush_extension()

if self.span:
self.span.finish()
logger.debug("datadog_lambda_wrapper _after() done")
Expand Down
1 change: 0 additions & 1 deletion scripts/publish_staging.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/bin/bash
set -e

./scripts/list_layers.sh
./scripts/build_layers.sh
./scripts/publish_layers.sh us-east-1
11 changes: 7 additions & 4 deletions tests/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ def setUp(self):
self.mock_wrapper_lambda_stats = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch("datadog_lambda.metric.lambda_metric")
self.mock_lambda_metric = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch("datadog_lambda.wrapper.extract_dd_trace_context")
self.mock_extract_dd_trace_context = patcher.start()
self.addCleanup(patcher.stop)
Expand Down Expand Up @@ -152,6 +148,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
)
]
)
Expand Down Expand Up @@ -181,6 +178,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
),
call(
"aws.lambda.enhanced.errors",
Expand All @@ -195,6 +193,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
),
]
)
Expand Down Expand Up @@ -229,6 +228,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
),
call(
"aws.lambda.enhanced.invocations",
Expand All @@ -243,6 +243,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
),
]
)
Expand Down Expand Up @@ -275,6 +276,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
)
]
)
Expand Down Expand Up @@ -307,6 +309,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
)
]
)
Expand Down