Skip to content

Commit 11a38d0

Browse files
vickentyremeh
andauthored
Add blocking socket mode and background sender (#787)
* Add option to use socket timeouts. settimeout(0) is equivalent to setblocking(0), so need only one call. Neither UDP or nor unix datagram sockets block on connect, so handling timeout in get_socket is not necessary. * Add background sender thread and a queue * Separate telemetry for dropped packets on write Align reported metrics with the go client. * Ignore missing queue type annotations * Add simple integration test for statsd * Fix import error on pypy * Fix formatting * Use py2.7 compatible syntax Python2.7 does not allow other positional parameters after *expression. * Update datadog/dogstatsd/base.py Co-authored-by: Rémy Mathieu <remy.mathieu@datadoghq.com> * Update datadog/dogstatsd/base.py Co-authored-by: Rémy Mathieu <remy.mathieu@datadoghq.com> * Make parameter name more descriptive * Mention wait_for_pending in disable_background_sender documentation * Update tests --------- Co-authored-by: Rémy Mathieu <remy.mathieu@datadoghq.com>
1 parent d02e184 commit 11a38d0

File tree

4 files changed

+201
-24
lines changed

4 files changed

+201
-24
lines changed

datadog/dogstatsd/base.py

Lines changed: 119 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
import time
1818
from threading import Lock, RLock
1919

20+
try:
21+
import queue
22+
except ImportError:
23+
# pypy has the same module, but capitalized.
24+
import Queue as queue # type: ignore[no-redef]
25+
2026
from typing import Optional, List, Text, Union
2127

2228
# Datadog libraries
@@ -78,8 +84,12 @@
7884
"datadog.dogstatsd.client.service_checks:%s|c|#%s",
7985
"datadog.dogstatsd.client.bytes_sent:%s|c|#%s",
8086
"datadog.dogstatsd.client.bytes_dropped:%s|c|#%s",
87+
"datadog.dogstatsd.client.bytes_dropped_queue:%s|c|#%s",
88+
"datadog.dogstatsd.client.bytes_dropped_writer:%s|c|#%s",
8189
"datadog.dogstatsd.client.packets_sent:%s|c|#%s",
8290
"datadog.dogstatsd.client.packets_dropped:%s|c|#%s",
91+
"datadog.dogstatsd.client.packets_dropped_queue:%s|c|#%s",
92+
"datadog.dogstatsd.client.packets_dropped_writer:%s|c|#%s",
8393
]
8494
) + "\n"
8595

@@ -110,6 +120,11 @@ def __init__(
110120
max_buffer_len=0, # type: int
111121
container_id=None, # type: Optional[Text]
112122
origin_detection_enabled=True, # type: bool
123+
socket_timeout=0, # type: Optional[float]
124+
telemetry_socket_timeout=0, # type: Optional[float]
125+
disable_background_sender=True, # type: bool
126+
sender_queue_size=0, # type: int
127+
sender_queue_timeout=0, # type: Optional[float]
113128
): # type: (...) -> None
114129
"""
115130
Initialize a DogStatsd object.
@@ -239,6 +254,35 @@ def __init__(
239254
Default: True.
240255
More on this: https://docs.datadoghq.com/developers/dogstatsd/?tab=kubernetes#origin-detection-over-udp
241256
:type origin_detection_enabled: boolean
257+
258+
:param socket_timeout: Set timeout for socket operations, in seconds. Optional.
259+
If sets to zero, never wait if operation can not be completed immediately. If set to None, wait forever.
260+
This option does not affect hostname resolution when using UDP.
261+
:type socket_timeout: float
262+
263+
:param telemetry_socket_timeout: Set timeout for the telemetry socket operations. Optional.
264+
Effective only if either telemetry_host or telemetry_socket_path are set.
265+
If sets to zero, never wait if operation can not be completed immediately. If set to None, wait forever.
266+
This option does not affect hostname resolution when using UDP.
267+
:type telemetry_socket_timeout: float
268+
269+
:param disable_background_sender: Use a background thread to communicate with the dogstatsd server. Optional.
270+
When enabled, a background thread will be used to send metric payloads to the Agent.
271+
Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent.
272+
Default: True.
273+
:type disable_background_sender: boolean
274+
275+
:param sender_queue_size: Set the maximum number of packets to queue for the sender. Optional
276+
How may packets to queue before blocking or dropping the packet if the packet queue is already full.
277+
Default: 0 (unlimited).
278+
:type sender_queue_size: integer
279+
280+
:param sender_queue_timeout: Set timeout for packet queue operations, in seconds. Optional.
281+
How long the application thread is willing to wait for the queue clear up before dropping the metric packet.
282+
If set to None, wait forever.
283+
If set to zero drop the packet immediately if the queue is full.
284+
Default: 0 (no wait)
285+
:type sender_queue_timeout: float
242286
"""
243287

244288
self._socket_lock = Lock()
@@ -276,6 +320,7 @@ def __init__(
276320

277321
# Connection
278322
self._max_payload_size = max_buffer_len
323+
self.socket_timeout = socket_timeout
279324
if socket_path is not None:
280325
self.socket_path = socket_path # type: Optional[text]
281326
self.host = None
@@ -294,6 +339,7 @@ def __init__(
294339
self.telemetry_socket_path = telemetry_socket_path
295340
self.telemetry_host = None
296341
self.telemetry_port = None
342+
self.telemetry_socket_timeout = telemetry_socket_timeout
297343
if not telemetry_socket_path and telemetry_host:
298344
self.telemetry_socket_path = None
299345
self.telemetry_host = self.resolve_host(telemetry_host, use_default_route)
@@ -367,6 +413,17 @@ def __init__(
367413
self._flush_thread_stop = threading.Event()
368414
self._start_flush_thread(self._flush_interval)
369415

416+
self._queue = None
417+
if not disable_background_sender:
418+
self._queue = queue.Queue(sender_queue_size)
419+
self._start_sender_thread()
420+
if sender_queue_timeout is None:
421+
self._queue_blocking = True
422+
self._queue_timeout = None
423+
else:
424+
self._queue_blocking = sender_queue_timeout > 0
425+
self._queue_timeout = max(0, sender_queue_timeout)
426+
370427
def disable_telemetry(self):
371428
self._telemetry = False
372429

@@ -479,22 +536,25 @@ def get_socket(self, telemetry=False):
479536
if self.telemetry_socket_path is not None:
480537
self.telemetry_socket = self._get_uds_socket(
481538
self.telemetry_socket_path,
539+
self.telemetry_socket_timeout,
482540
)
483541
else:
484542
self.telemetry_socket = self._get_udp_socket(
485543
self.telemetry_host,
486544
self.telemetry_port,
545+
self.telemetry_socket_timeout,
487546
)
488547

489548
return self.telemetry_socket
490549

491550
if not self.socket:
492551
if self.socket_path is not None:
493-
self.socket = self._get_uds_socket(self.socket_path)
552+
self.socket = self._get_uds_socket(self.socket_path, self.socket_timeout)
494553
else:
495554
self.socket = self._get_udp_socket(
496555
self.host,
497556
self.port,
557+
self.socket_timeout,
498558
)
499559

500560
return self.socket
@@ -513,17 +573,17 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
513573
pass
514574

515575
@classmethod
516-
def _get_uds_socket(cls, socket_path):
576+
def _get_uds_socket(cls, socket_path, timeout):
517577
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
518-
sock.setblocking(0)
578+
sock.settimeout(timeout)
519579
cls._ensure_min_send_buffer_size(sock)
520580
sock.connect(socket_path)
521581
return sock
522582

523583
@classmethod
524-
def _get_udp_socket(cls, host, port):
584+
def _get_udp_socket(cls, host, port, timeout):
525585
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
526-
sock.setblocking(0)
586+
sock.settimeout(timeout)
527587
cls._ensure_min_send_buffer_size(sock)
528588
sock.connect((host, port))
529589

@@ -804,9 +864,12 @@ def _reset_telemetry(self):
804864
self.events_count = 0
805865
self.service_checks_count = 0
806866
self.bytes_sent = 0
807-
self.bytes_dropped = 0
867+
self.bytes_dropped_queue = 0
868+
self.bytes_dropped_writer = 0
808869
self.packets_sent = 0
809870
self.packets_dropped = 0
871+
self.packets_dropped_queue = 0
872+
self.packets_dropped_writer = 0
810873
self._last_flush_time = time.time()
811874

812875
def _flush_telemetry(self):
@@ -821,11 +884,19 @@ def _flush_telemetry(self):
821884
telemetry_tags,
822885
self.bytes_sent,
823886
telemetry_tags,
824-
self.bytes_dropped,
887+
self.bytes_dropped_queue + self.bytes_dropped_writer,
888+
telemetry_tags,
889+
self.bytes_dropped_queue,
890+
telemetry_tags,
891+
self.bytes_dropped_writer,
825892
telemetry_tags,
826893
self.packets_sent,
827894
telemetry_tags,
828-
self.packets_dropped,
895+
self.packets_dropped_queue + self.packets_dropped_writer,
896+
telemetry_tags,
897+
self.packets_dropped_queue,
898+
telemetry_tags,
899+
self.packets_dropped_writer,
829900
telemetry_tags,
830901
)
831902

@@ -834,7 +905,19 @@ def _is_telemetry_flush_time(self):
834905
self._last_flush_time + self._telemetry_flush_interval < time.time()
835906

836907
def _send_to_server(self, packet):
837-
self._xmit_packet(packet + '\n', False)
908+
if self._queue is not None:
909+
try:
910+
self._queue.put(packet + '\n', self._queue_blocking, self._queue_timeout)
911+
except queue.Full:
912+
self.packets_dropped_queue += 1
913+
self.bytes_dropped_queue += 1
914+
return
915+
916+
self._xmit_packet_with_telemetry(packet + '\n')
917+
918+
def _xmit_packet_with_telemetry(self, packet):
919+
self._xmit_packet(packet, False)
920+
838921
if self._is_telemetry_flush_time():
839922
telemetry = self._flush_telemetry()
840923
if self._xmit_packet(telemetry, True):
@@ -844,8 +927,8 @@ def _send_to_server(self, packet):
844927
else:
845928
# Telemetry packet has been dropped, keep telemetry data for the next flush
846929
self._last_flush_time = time.time()
847-
self.bytes_dropped += len(telemetry)
848-
self.packets_dropped += 1
930+
self.bytes_dropped_writer += len(telemetry)
931+
self.packets_dropped_writer += 1
849932

850933
def _xmit_packet(self, packet, is_telemetry):
851934
try:
@@ -892,8 +975,8 @@ def _xmit_packet(self, packet, is_telemetry):
892975
log.error("Unexpected error: %s", str(exc))
893976

894977
if not is_telemetry and self._telemetry:
895-
self.bytes_dropped += len(packet)
896-
self.packets_dropped += 1
978+
self.bytes_dropped_writer += len(packet)
979+
self.packets_dropped_writer += 1
897980

898981
return False
899982

@@ -1066,5 +1149,28 @@ def _set_container_id(self, container_id, origin_detection_enabled):
10661149
log.debug("Couldn't get container ID: %s", str(e))
10671150
self._container_id = None
10681151

1152+
def _start_sender_thread(self):
1153+
log.debug("Starting background sender thread")
1154+
self._sender_thread = threading.Thread(
1155+
name="{}_sender_thread".format(self.__class__.__name__),
1156+
target=self._sender_main_loop,
1157+
)
1158+
self._sender_thread.daemon = True
1159+
self._sender_thread.start()
1160+
1161+
def _sender_main_loop(self):
1162+
while True:
1163+
self._xmit_packet_with_telemetry(self._queue.get())
1164+
self._queue.task_done()
1165+
1166+
def wait_for_pending(self):
1167+
"""
1168+
Flush the buffer and wait for all queued payloads to be written to the server.
1169+
"""
1170+
1171+
self.flush()
1172+
if self._queue is not None:
1173+
self._queue.join()
1174+
10691175

10701176
statsd = DogStatsd()

mypy.ini

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
# Ignore Mypy errors about packages with missing type hints.
44
#
5-
# Note: If Python 2 support is dropped in the future, `pkg_resources` and
6-
# `configparser` will no longer need to be ignored because the Python 3
7-
# versions have type hints.
5+
# Note: If Python 2 support is dropped in the future, `pkg_resources`,
6+
# `configparser` and `queue` will no longer need to be ignored because
7+
# the Python 3 versions have type hints.
88

99
[mypy-boto.*]
1010
ignore_missing_imports = True
@@ -20,3 +20,9 @@ ignore_missing_imports = True
2020

2121
[mypy-pkg_resources.*]
2222
ignore_missing_imports = True
23+
24+
[mypy-queue.*]
25+
ignore_missing_imports = True
26+
27+
[mypy-Queue.*]
28+
ignore_missing_imports = True
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import itertools
2+
import socket
3+
from threading import Thread
4+
5+
import pytest
6+
7+
from datadog.dogstatsd.base import DogStatsd
8+
9+
@pytest.mark.parametrize(
10+
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout",
11+
list(itertools.product([True, False], [True, False], [True, False], [0, 1])),
12+
)
13+
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout):
14+
# Test basic sender operation with an assortment of options
15+
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
16+
statsd = DogStatsd(
17+
telemetry_min_flush_interval=0,
18+
disable_background_sender=disable_background_sender,
19+
disable_buffering=disable_buffering,
20+
socket_timeout=socket_timeout,
21+
)
22+
23+
statsd.socket = foo
24+
statsd._reset_telemetry()
25+
26+
def reader_thread():
27+
msg = bar.recv(8192)
28+
assert msg == b"test.metric:1|c\n"
29+
30+
t = Thread(target=reader_thread, name="test_sender_mode/reader_thread")
31+
t.daemon = True
32+
t.start()
33+
34+
statsd.increment("test.metric")
35+
if wait_for_pending:
36+
statsd.wait_for_pending()
37+
38+
t.join(timeout=10)
39+
assert not t.is_alive()

0 commit comments

Comments
 (0)