Skip to content

Commit a9f9d1f

Browse files
Statsd integration (#89)
* Implement statsd protocol * Bump version to 2.22.0 * Remove redundant whitespace * Clean up from PR review
1 parent 10638a8 commit a9f9d1f

File tree

7 files changed

+77
-22
lines changed

7 files changed

+77
-22
lines changed

datadog_lambda/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# The minor version corresponds to the Lambda layer version.
22
# E.g.,, version 0.5.0 gets packaged into layer version 5.
3-
__version__ = "2.21.0"
3+
__version__ = "2.22.0"
44

55

66
import os

datadog_lambda/extension.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import logging
2+
import requests
3+
4+
AGENT_URL = "http://127.0.0.1:8124"
5+
HELLO_PATH = "/lambda/hello"
6+
FLUSH_PATH = "/lambda/flush"
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
def is_extension_running():
12+
try:
13+
requests.get(AGENT_URL + HELLO_PATH)
14+
except Exception as e:
15+
logger.debug("Extension is not running, returned with error %s", e)
16+
return False
17+
return True
18+
19+
20+
def flush_extension():
21+
try:
22+
requests.post(AGENT_URL + FLUSH_PATH, data={})
23+
except Exception as e:
24+
logger.debug("Failed to flush extension, returned with error %s", e)
25+
return False
26+
return True
27+
28+
29+
should_use_extension = is_extension_running()

datadog_lambda/metric.py

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,42 @@
1010
import logging
1111

1212
import boto3
13-
from datadog import api
13+
from datadog import api, initialize, statsd
1414
from datadog.threadstats import ThreadStats
15+
from datadog_lambda.extension import should_use_extension
1516
from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer
1617

1718

1819
ENHANCED_METRICS_NAMESPACE_PREFIX = "aws.lambda.enhanced"
1920

2021
logger = logging.getLogger(__name__)
2122

22-
lambda_stats = ThreadStats()
23-
lambda_stats.start()
23+
24+
class StatsDWrapper:
25+
"""
26+
Wraps StatsD calls, to give an identical interface to ThreadStats
27+
"""
28+
29+
def __init__(self):
30+
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125}
31+
initialize(**options)
32+
33+
def distribution(self, metric_name, value, tags=[], timestamp=None):
34+
statsd.distribution(metric_name, value, tags=tags)
35+
36+
def flush(self, value):
37+
pass
2438

2539

26-
def lambda_metric(metric_name, value, timestamp=None, tags=None):
40+
lambda_stats = None
41+
if should_use_extension:
42+
lambda_stats = StatsDWrapper()
43+
else:
44+
lambda_stats = ThreadStats()
45+
lambda_stats.start()
46+
47+
48+
def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=False):
2749
"""
2850
Submit a data point to Datadog distribution metrics.
2951
https://docs.datadoghq.com/graphing/metrics/distributions/
@@ -36,12 +58,14 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None):
3658
periodically and at the end of the function execution in a
3759
background thread.
3860
"""
61+
flush_to_logs = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true"
3962
tags = tag_dd_lambda_layer(tags)
40-
if os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true":
41-
write_metric_point_to_stdout(metric_name, value, timestamp, tags)
63+
64+
if flush_to_logs or (force_async and not should_use_extension):
65+
write_metric_point_to_stdout(metric_name, value, timestamp=timestamp, tags=tags)
4266
else:
4367
logger.debug("Sending metric %s to Datadog via lambda layer", metric_name)
44-
lambda_stats.distribution(metric_name, value, timestamp=timestamp, tags=tags)
68+
lambda_stats.distribution(metric_name, value, tags=tags, timestamp=timestamp)
4569

4670

4771
def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
@@ -85,13 +109,10 @@ def submit_enhanced_metric(metric_name, lambda_context):
85109
metric_name,
86110
)
87111
return
88-
89-
# Enhanced metrics are always written to logs
90-
write_metric_point_to_stdout(
91-
"{}.{}".format(ENHANCED_METRICS_NAMESPACE_PREFIX, metric_name),
92-
1,
93-
tags=get_enhanced_metrics_tags(lambda_context),
94-
)
112+
tags = get_enhanced_metrics_tags(lambda_context)
113+
metric_name = "aws.lambda.enhanced." + metric_name
114+
# Enhanced metrics always use an async submission method, (eg logs or extension).
115+
lambda_metric(metric_name, 1, timestamp=None, tags=tags, force_async=True)
95116

96117

97118
def submit_invocations_metric(lambda_context):

datadog_lambda/tags.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ def get_enhanced_metrics_tags(lambda_context):
8585
get_cold_start_tag(),
8686
"memorysize:{}".format(lambda_context.memory_limit_in_mb),
8787
get_runtime_tag(),
88-
_format_dd_lambda_layer_tag(),
8988
]
9089

9190

datadog_lambda/wrapper.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import logging
88
import traceback
99

10+
from datadog_lambda.extension import should_use_extension, flush_extension
1011
from datadog_lambda.cold_start import set_cold_start, is_cold_start
1112
from datadog_lambda.metric import (
1213
lambda_stats,
@@ -137,8 +138,11 @@ def _before(self, event, context):
137138

138139
def _after(self, event, context):
139140
try:
140-
if not self.flush_to_log:
141+
if not self.flush_to_log or should_use_extension:
141142
lambda_stats.flush(float("inf"))
143+
if should_use_extension:
144+
flush_extension()
145+
142146
if self.span:
143147
self.span.finish()
144148
logger.debug("datadog_lambda_wrapper _after() done")

scripts/publish_staging.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#!/bin/bash
22
set -e
33

4-
./scripts/list_layers.sh
54
./scripts/build_layers.sh
65
./scripts/publish_layers.sh us-east-1

tests/test_wrapper.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,6 @@ def setUp(self):
3838
self.mock_wrapper_lambda_stats = patcher.start()
3939
self.addCleanup(patcher.stop)
4040

41-
patcher = patch("datadog_lambda.metric.lambda_metric")
42-
self.mock_lambda_metric = patcher.start()
43-
self.addCleanup(patcher.stop)
44-
4541
patcher = patch("datadog_lambda.wrapper.extract_dd_trace_context")
4642
self.mock_extract_dd_trace_context = patcher.start()
4743
self.addCleanup(patcher.stop)
@@ -152,6 +148,7 @@ def lambda_handler(event, context):
152148
"runtime:python2.7",
153149
"dd_lambda_layer:datadog-python27_0.1.0",
154150
],
151+
timestamp=None,
155152
)
156153
]
157154
)
@@ -181,6 +178,7 @@ def lambda_handler(event, context):
181178
"runtime:python2.7",
182179
"dd_lambda_layer:datadog-python27_0.1.0",
183180
],
181+
timestamp=None,
184182
),
185183
call(
186184
"aws.lambda.enhanced.errors",
@@ -195,6 +193,7 @@ def lambda_handler(event, context):
195193
"runtime:python2.7",
196194
"dd_lambda_layer:datadog-python27_0.1.0",
197195
],
196+
timestamp=None,
198197
),
199198
]
200199
)
@@ -229,6 +228,7 @@ def lambda_handler(event, context):
229228
"runtime:python2.7",
230229
"dd_lambda_layer:datadog-python27_0.1.0",
231230
],
231+
timestamp=None,
232232
),
233233
call(
234234
"aws.lambda.enhanced.invocations",
@@ -243,6 +243,7 @@ def lambda_handler(event, context):
243243
"runtime:python2.7",
244244
"dd_lambda_layer:datadog-python27_0.1.0",
245245
],
246+
timestamp=None,
246247
),
247248
]
248249
)
@@ -275,6 +276,7 @@ def lambda_handler(event, context):
275276
"runtime:python2.7",
276277
"dd_lambda_layer:datadog-python27_0.1.0",
277278
],
279+
timestamp=None,
278280
)
279281
]
280282
)
@@ -307,6 +309,7 @@ def lambda_handler(event, context):
307309
"runtime:python2.7",
308310
"dd_lambda_layer:datadog-python27_0.1.0",
309311
],
312+
timestamp=None,
310313
)
311314
]
312315
)

0 commit comments

Comments
 (0)