From d3902ff0f502405a47ad9184426a4cdd491cde0b Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Fri, 3 Dec 2021 19:07:32 +0000 Subject: [PATCH 01/19] Add Sender.queue_size limits --- lib/datadog/statsd/forwarder.rb | 4 ++- lib/datadog/statsd/sender.rb | 22 ++++++++++++++-- spec/statsd/sender_spec.rb | 45 ++++++++++++++++++++++++++++++++- 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/lib/datadog/statsd/forwarder.rb b/lib/datadog/statsd/forwarder.rb index b56ab3a8..3085694c 100644 --- a/lib/datadog/statsd/forwarder.rb +++ b/lib/datadog/statsd/forwarder.rb @@ -48,7 +48,9 @@ def initialize( max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE, overflowing_stategy: buffer_overflowing_stategy, ) - @sender = (single_thread ? SingleThreadSender : Sender).new(buffer, logger: logger) + @sender = single_thread ? + SingleThreadSender.new(buffer, logger: logger) : + Sender.new(buffer, logger: logger) @sender.start end diff --git a/lib/datadog/statsd/sender.rb b/lib/datadog/statsd/sender.rb index aa28320e..b44f048d 100644 --- a/lib/datadog/statsd/sender.rb +++ b/lib/datadog/statsd/sender.rb @@ -12,8 +12,9 @@ class Statsd class Sender CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close) - def initialize(message_buffer, logger: nil) + def initialize(message_buffer, queue_size: 0, logger: nil) @message_buffer = message_buffer + @queue_size = queue_size @logger = logger @mx = Mutex.new end @@ -71,7 +72,15 @@ def add(message) } end - message_queue << message + @mx.synchronize { + new_size = @message_queue_bytesize + message.bytesize + if @queue_size == 0 || new_size <= @queue_size + @message_queue_bytesize = new_size + message_queue << message + else + @logger.info { "Statsd: dropping message due to backlog in sender queue" } if @logger + end + } end def start @@ -79,6 +88,7 @@ def start # initialize a new message queue for the background thread @message_queue = Queue.new + @message_queue_bytesize = 0 # start background thread @sender_thread = Thread.new(&method(:send_loop)) @sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3') @@ -128,10 +138,14 @@ def send_loop message.push(:go_on) else message_buffer.add(message) + @mx.synchronize { + @message_queue_bytesize -= message.bytesize + } end end @message_queue = nil + @message_queue_bytesize = nil @sender_thread = nil end else @@ -149,11 +163,15 @@ def send_loop when Queue message.push(:go_on) else + @mx.synchronize { + @message_queue_bytesize -= message.bytesize + } message_buffer.add(message) end end @message_queue = nil + @message_queue_bytesize = nil @sender_thread = nil end end diff --git a/spec/statsd/sender_spec.rb b/spec/statsd/sender_spec.rb index ccf66bd5..b3e539aa 100644 --- a/spec/statsd/sender_spec.rb +++ b/spec/statsd/sender_spec.rb @@ -1,8 +1,27 @@ require 'spec_helper' +class Waiter + def initialize() + @mx = Mutex.new + @cv = ConditionVariable.new + @sig = false + end + + def wait() + @mx.synchronize { @cv.wait(@mx) until @sig } + end + + def signal() + @mx.synchronize { + @sig = true + @cv.signal + } + end +end + describe Datadog::Statsd::Sender do subject do - described_class.new(message_buffer) + described_class.new(message_buffer, queue_size: 64) end let(:message_buffer) do @@ -87,6 +106,30 @@ subject.rendez_vous end + + it 'adds only messages up to queue_size bytes' do + # keep the sender thread busy handling a flush + waiter = Waiter.new + expect(message_buffer) + .to receive(:flush) { waiter.wait } + subject.flush + + sixtyFourBytes = 'abcd' * 16 + + expect(message_buffer) + .to receive(:add) + .with(sixtyFourBytes) + .exactly(1).times + + subject.add(sixtyFourBytes) + subject.add(sixtyFourBytes) # (dropped) + subject.add(sixtyFourBytes) # (dropped) + + # resume the sender thread again + waiter.signal + + subject.rendez_vous + end end end From c51c2fda1ef7ff92385a6f2f50084e4148f10900 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Mon, 27 Dec 2021 17:10:34 +0000 Subject: [PATCH 02/19] Count Sender.queue_size in messages, not bytes And remove use of a Mutex. This is a "soft" limit, so just examining the queue length is sufficient. --- lib/datadog/statsd/sender.rb | 25 ++++++------------------- spec/statsd/sender_spec.rb | 22 +++++++++++----------- 2 files changed, 17 insertions(+), 30 deletions(-) diff --git a/lib/datadog/statsd/sender.rb b/lib/datadog/statsd/sender.rb index b44f048d..1a277f4c 100644 --- a/lib/datadog/statsd/sender.rb +++ b/lib/datadog/statsd/sender.rb @@ -12,7 +12,7 @@ class Statsd class Sender CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close) - def initialize(message_buffer, queue_size: 0, logger: nil) + def initialize(message_buffer, queue_size: 2048, logger: nil) @message_buffer = message_buffer @queue_size = queue_size @logger = logger @@ -72,15 +72,11 @@ def add(message) } end - @mx.synchronize { - new_size = @message_queue_bytesize + message.bytesize - if @queue_size == 0 || new_size <= @queue_size - @message_queue_bytesize = new_size - message_queue << message - else - @logger.info { "Statsd: dropping message due to backlog in sender queue" } if @logger - end - } + if message_queue.length <= @queue_size + message_queue << message + else + @logger.debug { "Sender queue full; dropping" } if @logger # TODO: tlm instead of log + end end def start @@ -88,7 +84,6 @@ def start # initialize a new message queue for the background thread @message_queue = Queue.new - @message_queue_bytesize = 0 # start background thread @sender_thread = Thread.new(&method(:send_loop)) @sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3') @@ -138,14 +133,10 @@ def send_loop message.push(:go_on) else message_buffer.add(message) - @mx.synchronize { - @message_queue_bytesize -= message.bytesize - } end end @message_queue = nil - @message_queue_bytesize = nil @sender_thread = nil end else @@ -163,15 +154,11 @@ def send_loop when Queue message.push(:go_on) else - @mx.synchronize { - @message_queue_bytesize -= message.bytesize - } message_buffer.add(message) end end @message_queue = nil - @message_queue_bytesize = nil @sender_thread = nil end end diff --git a/spec/statsd/sender_spec.rb b/spec/statsd/sender_spec.rb index b3e539aa..c8df096a 100644 --- a/spec/statsd/sender_spec.rb +++ b/spec/statsd/sender_spec.rb @@ -21,7 +21,7 @@ def signal() describe Datadog::Statsd::Sender do subject do - described_class.new(message_buffer, queue_size: 64) + described_class.new(message_buffer, queue_size: 5) end let(:message_buffer) do @@ -88,7 +88,7 @@ def signal() end end - context 'when starting and stopping' do + context 'when started' do before do subject.start end @@ -107,25 +107,25 @@ def signal() subject.rendez_vous end - it 'adds only messages up to queue_size bytes' do + it 'adds only messages up to queue_size messages' do # keep the sender thread busy handling a flush waiter = Waiter.new expect(message_buffer) .to receive(:flush) { waiter.wait } subject.flush - sixtyFourBytes = 'abcd' * 16 + # send six messages; sixth is dropped + for i in 0..6 do + subject.add('message') + end expect(message_buffer) .to receive(:add) - .with(sixtyFourBytes) - .exactly(1).times - - subject.add(sixtyFourBytes) - subject.add(sixtyFourBytes) # (dropped) - subject.add(sixtyFourBytes) # (dropped) + .with('message') + .exactly(5).times - # resume the sender thread again + # resume the sender thread again to receive those six + # messages waiter.signal subject.rendez_vous From d8f95a3996b1d782bcd01f4edb0462191cb7e1a4 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Mon, 27 Dec 2021 17:34:30 +0000 Subject: [PATCH 03/19] Thread sender_queue_size through constructors --- README.md | 6 ++++++ lib/datadog/statsd.rb | 10 ++++++++++ lib/datadog/statsd/forwarder.rb | 8 +++++++- spec/statsd/forwarder_spec.rb | 9 +++++++-- 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 2ccca9ee..f6d02b4f 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,12 @@ There are three different kinds of messages: There is also an implicit message which closes the queue which will cause the sender thread to finish processing and exit. +The message queue's maximum size is given by the `sender_queue_size` argument, and has appropriate defaults for UDP (2048) and UDS (512). + +```ruby +statsd = Datadog::Statsd.new('localhost', 8125, sender_queue_size: 128) +``` + ### Usual workflow You push metrics to the statsd client which writes them quickly to the sender message queue. The sender thread receives those message, buffers them and flushes them to the connection when the buffer limit is reached. diff --git a/lib/datadog/statsd.rb b/lib/datadog/statsd.rb index 90db0d84..abefd143 100644 --- a/lib/datadog/statsd.rb +++ b/lib/datadog/statsd.rb @@ -44,7 +44,12 @@ class Error < StandardError UDP_DEFAULT_BUFFER_SIZE = 1_432 UDS_DEFAULT_BUFFER_SIZE = 8_192 DEFAULT_BUFFER_POOL_SIZE = Float::INFINITY + + UDP_DEFAULT_SENDER_QUEUE_SIZE = 2048 + UDS_DEFAULT_SENDER_QUEUE_SIZE = 512 + MAX_EVENT_SIZE = 8 * 1_024 + # minimum flush interval for the telemetry in seconds DEFAULT_TELEMETRY_FLUSH_INTERVAL = 10 @@ -73,6 +78,7 @@ def tags # @option [Logger] logger for debugging # @option [Integer] buffer_max_payload_size max bytes to buffer # @option [Integer] buffer_max_pool_size max messages to buffer + # @option [Integer] sender_queue_size size of the sender queue in number of buffers (multi-thread only) # @option [String] socket_path unix socket path # @option [Float] default sample rate if not overridden # @option [Boolean] single_thread flushes the metrics on the main thread instead of in a companion thread @@ -89,6 +95,8 @@ def initialize( buffer_max_pool_size: nil, buffer_overflowing_stategy: :drop, + sender_queue_size: nil, + logger: nil, single_thread: false, @@ -134,6 +142,8 @@ def initialize( buffer_max_pool_size: buffer_max_pool_size, buffer_overflowing_stategy: buffer_overflowing_stategy, + sender_queue_size: sender_queue_size, + telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil, ) end diff --git a/lib/datadog/statsd/forwarder.rb b/lib/datadog/statsd/forwarder.rb index 3085694c..5183dac2 100644 --- a/lib/datadog/statsd/forwarder.rb +++ b/lib/datadog/statsd/forwarder.rb @@ -13,6 +13,8 @@ def initialize( buffer_max_pool_size: nil, buffer_overflowing_stategy: :drop, + sender_queue_size: nil, + telemetry_flush_interval: nil, global_tags: [], @@ -48,9 +50,13 @@ def initialize( max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE, overflowing_stategy: buffer_overflowing_stategy, ) + + sender_queue_size ||= (@transport_type == :udp ? + UDP_DEFAULT_SENDER_QUEUE_SIZE : UDS_DEFAULT_SENDER_QUEUE_SIZE) + @sender = single_thread ? SingleThreadSender.new(buffer, logger: logger) : - Sender.new(buffer, logger: logger) + Sender.new(buffer, logger: logger, queue_size: sender_queue_size) @sender.start end diff --git a/spec/statsd/forwarder_spec.rb b/spec/statsd/forwarder_spec.rb index 31492754..222b7657 100644 --- a/spec/statsd/forwarder_spec.rb +++ b/spec/statsd/forwarder_spec.rb @@ -104,7 +104,10 @@ it 'builds the sender' do expect(Datadog::Statsd::Sender) .to receive(:new) - .with(message_buffer, logger: logger) + .with(message_buffer, + logger: logger, + queue_size: + Datadog::Statsd::UDP_DEFAULT_SENDER_QUEUE_SIZE) .exactly(1) subject @@ -282,7 +285,9 @@ it 'builds the sender' do expect(Datadog::Statsd::Sender) .to receive(:new) - .with(message_buffer, logger: logger) + .with(message_buffer, + logger: logger, + queue_size: Datadog::Statsd::UDS_DEFAULT_SENDER_QUEUE_SIZE) .exactly(1) subject From c0d62d1af0ba4b7ada5ddd5d9d506067e99eb324 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Mon, 27 Dec 2021 18:28:50 +0000 Subject: [PATCH 04/19] add {packets,bytes}_dropped_writer telemetry --- lib/datadog/statsd/connection.rb | 2 +- lib/datadog/statsd/telemetry.rb | 10 ++- spec/integrations/allocation_spec.rb | 96 ++++++++++++++-------------- spec/integrations/buffering_spec.rb | 8 +-- spec/integrations/telemetry_spec.rb | 41 ++++++++---- spec/matchers/telemetry_matcher.rb | 14 +++- spec/statsd/telemetry_spec.rb | 44 ++++++++++--- spec/statsd/udp_connection_spec.rb | 26 ++++---- spec/statsd/uds_connection_spec.rb | 34 +++++----- spec/statsd_spec.rb | 4 +- 10 files changed, 169 insertions(+), 110 deletions(-) diff --git a/lib/datadog/statsd/connection.rb b/lib/datadog/statsd/connection.rb index 2a8f999a..35adbbc6 100644 --- a/lib/datadog/statsd/connection.rb +++ b/lib/datadog/statsd/connection.rb @@ -38,7 +38,7 @@ def write(payload) end end - telemetry.dropped(packets: 1, bytes: payload.length) if telemetry + telemetry.dropped_writer(packets: 1, bytes: payload.length) if telemetry logger.error { "Statsd: #{boom.class} #{boom}" } if logger nil end diff --git a/lib/datadog/statsd/telemetry.rb b/lib/datadog/statsd/telemetry.rb index 54b45366..6615091b 100644 --- a/lib/datadog/statsd/telemetry.rb +++ b/lib/datadog/statsd/telemetry.rb @@ -9,8 +9,10 @@ class Telemetry attr_reader :service_checks attr_reader :bytes_sent attr_reader :bytes_dropped + attr_reader :bytes_dropped_writer attr_reader :packets_sent attr_reader :packets_dropped + attr_reader :packets_dropped_writer # Rough estimation of maximum telemetry message size without tags MAX_TELEMETRY_MESSAGE_SIZE_WT_TAGS = 50 # bytes @@ -40,8 +42,10 @@ def reset @service_checks = 0 @bytes_sent = 0 @bytes_dropped = 0 + @bytes_dropped_writer = 0 @packets_sent = 0 @packets_dropped = 0 + @packets_dropped_writer = 0 @next_flush_time = now_in_s + @flush_interval end @@ -54,9 +58,11 @@ def sent(metrics: 0, events: 0, service_checks: 0, bytes: 0, packets: 0) @packets_sent += packets end - def dropped(bytes: 0, packets: 0) + def dropped_writer(bytes: 0, packets: 0) @bytes_dropped += bytes + @bytes_dropped_writer += bytes @packets_dropped += packets + @packets_dropped_writer += packets end def should_flush? @@ -70,8 +76,10 @@ def flush sprintf(pattern, 'service_checks', @service_checks), sprintf(pattern, 'bytes_sent', @bytes_sent), sprintf(pattern, 'bytes_dropped', @bytes_dropped), + sprintf(pattern, 'bytes_dropped_writer', @bytes_dropped_writer), sprintf(pattern, 'packets_sent', @packets_sent), sprintf(pattern, 'packets_dropped', @packets_dropped), + sprintf(pattern, 'packets_dropped_writer', @packets_dropped_writer), ] end diff --git a/spec/integrations/allocation_spec.rb b/spec/integrations/allocation_spec.rb index 02b8477f..11361b15 100644 --- a/spec/integrations/allocation_spec.rb +++ b/spec/integrations/allocation_spec.rb @@ -44,11 +44,11 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 16 + 18 elsif RUBY_VERSION < '2.5.0' - 15 + 17 else - 14 + 16 end end @@ -72,13 +72,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 8 + 10 elsif RUBY_VERSION < '2.5.0' - 7 + 9 elsif RUBY_VERSION < '2.6.0' - 6 + 8 else - 5 + 7 end end @@ -93,13 +93,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 25 + 27 elsif RUBY_VERSION < '2.5.0' - 23 + 25 elsif RUBY_VERSION < '2.6.0' - 22 + 24 else - 21 + 23 end end @@ -121,13 +121,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 16 + 18 elsif RUBY_VERSION < '2.5.0' - 15 + 17 elsif RUBY_VERSION < '2.6.0' - 14 + 16 else - 13 + 15 end end @@ -151,13 +151,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 8 + 10 elsif RUBY_VERSION < '2.5.0' - 7 + 9 elsif RUBY_VERSION < '2.6.0' - 6 + 8 else - 5 + 7 end end @@ -172,13 +172,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 25 + 27 elsif RUBY_VERSION < '2.5.0' - 23 + 25 elsif RUBY_VERSION < '2.6.0' - 22 + 24 else - 21 + 23 end end @@ -200,13 +200,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 18 + 20 elsif RUBY_VERSION < '2.5.0' - 17 + 19 elsif RUBY_VERSION < '2.6.0' - 16 + 18 else - 15 + 17 end end @@ -230,13 +230,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 10 + 12 elsif RUBY_VERSION < '2.5.0' - 9 + 11 elsif RUBY_VERSION < '2.6.0' - 8 + 10 else - 7 + 9 end end @@ -251,13 +251,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 27 + 29 elsif RUBY_VERSION < '2.5.0' - 25 + 27 elsif RUBY_VERSION < '2.6.0' - 24 + 26 else - 23 + 25 end end @@ -279,13 +279,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 14 + 16 elsif RUBY_VERSION < '2.5.0' - 13 + 15 elsif RUBY_VERSION < '2.6.0' - 12 + 14 else - 11 + 13 end end @@ -309,13 +309,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 6 + 8 elsif RUBY_VERSION < '2.5.0' - 5 + 7 elsif RUBY_VERSION < '2.6.0' - 4 + 6 else - 3 + 5 end end @@ -330,13 +330,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 23 + 25 elsif RUBY_VERSION < '2.5.0' - 21 + 23 elsif RUBY_VERSION < '2.6.0' - 20 + 22 else - 19 + 21 end end @@ -348,4 +348,4 @@ end end end -end \ No newline at end of file +end diff --git a/spec/integrations/buffering_spec.rb b/spec/integrations/buffering_spec.rb index 4ca72bb4..e89800d6 100644 --- a/spec/integrations/buffering_spec.rb +++ b/spec/integrations/buffering_spec.rb @@ -141,7 +141,7 @@ end let(:buffer_max_pool_size) do - 9 + 11 # enough messages to include the telemetry end it 'increments telemetry correctly' do @@ -156,7 +156,7 @@ subject.flush(flush_telemetry: true, sync: true) - expect(socket.recv[0]).to eq_with_telemetry('mycounter:1|c', bytes_sent: 702, packets_sent: 1, metrics: 1) + expect(socket.recv[0]).to eq_with_telemetry('mycounter:1|c', bytes_sent: 914, packets_sent: 1, metrics: 1) subject.increment('myothercounter') @@ -166,7 +166,7 @@ subject.sync_with_outbound_io - expect(socket.recv[0]).to eq_with_telemetry('myothercounter:1|c', bytes_sent: 687, packets_sent: 1, metrics: 1) + expect(socket.recv[0]).to eq_with_telemetry('myothercounter:1|c', bytes_sent: 899, packets_sent: 1, metrics: 1) # last value is still buffered expect(socket.recv).to be_nil end @@ -199,7 +199,7 @@ expect(subject.telemetry.service_checks).to eq 0 expect(subject.telemetry.events).to eq 0 expect(subject.telemetry.packets_sent).to eq 1 - expect(subject.telemetry.bytes_sent).to eq 766 + expect(subject.telemetry.bytes_sent).to eq 978 end end end diff --git a/spec/integrations/telemetry_spec.rb b/spec/integrations/telemetry_spec.rb index 36899143..e993ed89 100644 --- a/spec/integrations/telemetry_spec.rb +++ b/spec/integrations/telemetry_spec.rb @@ -92,35 +92,35 @@ subject.decrement('test', 1) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:-1|c', metrics: 1, packets_sent: 1, bytes_sent: 680) + expect(socket.recv[0]).to eq_with_telemetry('test:-1|c', metrics: 1, packets_sent: 1, bytes_sent: 892) subject.count('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|c', metrics: 1, packets_sent: 1, bytes_sent: 683) + expect(socket.recv[0]).to eq_with_telemetry('test:21|c', metrics: 1, packets_sent: 1, bytes_sent: 895) subject.gauge('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|g', metrics: 1, packets_sent: 1, bytes_sent: 683) + expect(socket.recv[0]).to eq_with_telemetry('test:21|g', metrics: 1, packets_sent: 1, bytes_sent: 895) subject.histogram('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|h', metrics: 1, packets_sent: 1, bytes_sent: 683) + expect(socket.recv[0]).to eq_with_telemetry('test:21|h', metrics: 1, packets_sent: 1, bytes_sent: 895) subject.timing('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|ms', metrics: 1, packets_sent: 1, bytes_sent: 683) + expect(socket.recv[0]).to eq_with_telemetry('test:21|ms', metrics: 1, packets_sent: 1, bytes_sent: 895) subject.set('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|s', metrics: 1, packets_sent: 1, bytes_sent: 684) + expect(socket.recv[0]).to eq_with_telemetry('test:21|s', metrics: 1, packets_sent: 1, bytes_sent: 896) subject.service_check('sc', 0) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('_sc|sc|0', metrics: 0, service_checks: 1, packets_sent: 1, bytes_sent: 683) + expect(socket.recv[0]).to eq_with_telemetry('_sc|sc|0', metrics: 0, service_checks: 1, packets_sent: 1, bytes_sent: 895) subject.event('ev', 'text') subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('_e{2,4}:ev|text', metrics: 0, events: 1, packets_sent: 1, bytes_sent: 682) + expect(socket.recv[0]).to eq_with_telemetry('_e{2,4}:ev|text', metrics: 0, events: 1, packets_sent: 1, bytes_sent: 894) end context 'when some data is dropped' do @@ -139,8 +139,10 @@ expect(subject.telemetry.events).to eq 0 expect(subject.telemetry.packets_sent).to eq 0 expect(subject.telemetry.bytes_sent).to eq 0 - expect(subject.telemetry.packets_dropped).to eq 1 - expect(subject.telemetry.bytes_dropped).to eq 1353 + expect(subject.telemetry.packets_dropped).to eq 2 + expect(subject.telemetry.packets_dropped_writer).to eq 2 + expect(subject.telemetry.bytes_dropped).to eq 1776 + expect(subject.telemetry.bytes_dropped_writer).to eq 1776 subject.gauge('test', 21) subject.flush(flush_telemetry: true, sync: true) @@ -150,23 +152,34 @@ expect(subject.telemetry.events).to eq 0 expect(subject.telemetry.packets_sent).to eq 0 expect(subject.telemetry.bytes_sent).to eq 0 - expect(subject.telemetry.packets_dropped).to eq 1 - expect(subject.telemetry.bytes_dropped).to eq 1356 + expect(subject.telemetry.packets_dropped).to eq 2 + expect(subject.telemetry.packets_dropped_writer).to eq 2 + expect(subject.telemetry.bytes_dropped).to eq 1782 + expect(subject.telemetry.bytes_dropped_writer).to eq 1782 #disable network failure socket.error_on_send(nil) subject.gauge('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|g', metrics: 1, service_checks: 0, events: 0, packets_dropped: 1, bytes_dropped: 1356) + expect(socket.recv[0]).to eq_with_telemetry('test:21|g', + metrics: 1, + service_checks: 0, + events: 0, + packets_dropped: 2, + packets_dropped_writer: 2, + bytes_dropped: 1782, + bytes_dropped_writer: 1782) expect(subject.telemetry.metrics).to eq 0 expect(subject.telemetry.service_checks).to eq 0 expect(subject.telemetry.events).to eq 0 expect(subject.telemetry.packets_sent).to eq 1 - expect(subject.telemetry.bytes_sent).to eq 684 + expect(subject.telemetry.bytes_sent).to eq 899 expect(subject.telemetry.packets_dropped).to eq 0 + expect(subject.telemetry.packets_dropped_writer).to eq 0 expect(subject.telemetry.bytes_dropped).to eq 0 + expect(subject.telemetry.bytes_dropped_writer).to eq 0 end end end diff --git a/spec/matchers/telemetry_matcher.rb b/spec/matchers/telemetry_matcher.rb index 3c34e024..796fb511 100644 --- a/spec/matchers/telemetry_matcher.rb +++ b/spec/matchers/telemetry_matcher.rb @@ -4,7 +4,17 @@ telemetry_options ||= {} # Appends the telemetry metrics to the metrics string passed as 'text' - def add_telemetry(text, metrics: 1, events: 0, service_checks: 0, bytes_sent: 0, bytes_dropped:0, packets_sent: 0, packets_dropped: 0, transport: 'udp') + def add_telemetry(text, + metrics: 1, + events: 0, + service_checks: 0, + bytes_sent: 0, + bytes_dropped: 0, + bytes_dropped_writer: 0, + packets_sent: 0, + packets_dropped: 0, + packets_dropped_writer: 0, + transport: 'udp') [ text, "datadog.dogstatsd.client.metrics:#{metrics}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", @@ -12,8 +22,10 @@ def add_telemetry(text, metrics: 1, events: 0, service_checks: 0, bytes_sent: 0, "datadog.dogstatsd.client.service_checks:#{service_checks}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.bytes_sent:#{bytes_sent}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.bytes_dropped:#{bytes_dropped}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", + "datadog.dogstatsd.client.bytes_dropped_writer:#{bytes_dropped_writer}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.packets_sent:#{packets_sent}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.packets_dropped:#{packets_dropped}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", + "datadog.dogstatsd.client.packets_dropped_writer:#{packets_dropped_writer}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", ].join("\n") end diff --git a/spec/statsd/telemetry_spec.rb b/spec/statsd/telemetry_spec.rb index faab9deb..24b70170 100644 --- a/spec/statsd/telemetry_spec.rb +++ b/spec/statsd/telemetry_spec.rb @@ -77,7 +77,7 @@ describe '#flush' do before do subject.sent(metrics: 1, events: 2, service_checks: 3, bytes: 4, packets: 5) - subject.dropped(bytes: 6, packets: 7) + subject.dropped_writer(bytes: 6, packets: 7) subject.flush end @@ -88,8 +88,10 @@ "datadog.dogstatsd.client.service_checks:3|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.bytes_sent:4|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.bytes_dropped:6|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.bytes_dropped_writer:6|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.packets_sent:5|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.packets_dropped:7|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.packets_dropped_writer:7|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", ] end @@ -98,10 +100,10 @@ skip 'Ruby too old' if RUBY_VERSION < '2.3.0' end - it 'makes only 8 allocations' do + it 'makes only 10 allocations' do expect do subject.flush - end.to make_allocations(8) + end.to make_allocations(10) end end end @@ -112,7 +114,7 @@ allow(Process).to receive(:clock_gettime).and_return(0) if Datadog::Statsd::PROCESS_TIME_SUPPORTED subject.sent(metrics: 1, events: 2, service_checks: 3, bytes: 4, packets: 5) - subject.dropped(bytes: 6, packets: 7) + subject.dropped_writer(bytes: 6, packets: 7) end after do @@ -161,13 +163,25 @@ it 'resets the bytes_dropped' do expect do subject.reset - end.to change { subject.bytes_dropped }.from(6).to(0) + end.to change { subject.bytes_dropped}.from(6).to(0) + end + + it 'resets the bytes_dropped_writer' do + expect do + subject.reset + end.to change { subject.bytes_dropped_writer }.from(6).to(0) end it 'resets the packets_dropped' do expect do subject.reset - end.to change { subject.packets_dropped }.from(7).to(0) + end.to change { subject.packets_dropped}.from(7).to(0) + end + + it 'resets the packets_dropped_writer' do + expect do + subject.reset + end.to change { subject.packets_dropped_writer }.from(7).to(0) end end @@ -213,19 +227,31 @@ end end - describe '#dropped' do + describe '#dropped_writer' do context 'when bumping bytes' do + it 'has bumped bytes_dropped_writer by the right amount' do + expect do + subject.dropped_writer(bytes: 3) + end.to change { subject.bytes_dropped_writer }.from(0).to(3) + end + it 'has bumped bytes_dropped by the right amount' do expect do - subject.dropped(bytes: 3) + subject.dropped_writer(bytes: 3) end.to change { subject.bytes_dropped }.from(0).to(3) end end context 'when bumping packets' do + it 'has bumped packets_dropped_writer by the right amount' do + expect do + subject.dropped_writer(packets: 3) + end.to change { subject.packets_dropped_writer }.from(0).to(3) + end + it 'has bumped packets_dropped by the right amount' do expect do - subject.dropped(packets: 3) + subject.dropped_writer(packets: 3) end.to change { subject.packets_dropped }.from(0).to(3) end end diff --git a/spec/statsd/udp_connection_spec.rb b/spec/statsd/udp_connection_spec.rb index 66baeac4..3c9d6e2a 100644 --- a/spec/statsd/udp_connection_spec.rb +++ b/spec/statsd/udp_connection_spec.rb @@ -51,7 +51,7 @@ describe '#write' do let(:telemetry) do - instance_double(Datadog::Statsd::Telemetry, sent: true, dropped: true) + instance_double(Datadog::Statsd::Telemetry, sent: true, dropped_writer: true) end it 'connects to the right host and port' do @@ -108,9 +108,9 @@ subject.write('test') end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -262,9 +262,9 @@ subject.write('test') end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -289,9 +289,9 @@ expect(log.string).to match 'Statsd: SocketError yolo' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -336,9 +336,9 @@ subject.write('test') end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -463,9 +463,9 @@ subject.write('test') end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -528,9 +528,9 @@ subject.write('test') end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') diff --git a/spec/statsd/uds_connection_spec.rb b/spec/statsd/uds_connection_spec.rb index fa7695b4..b8814cfe 100644 --- a/spec/statsd/uds_connection_spec.rb +++ b/spec/statsd/uds_connection_spec.rb @@ -42,7 +42,7 @@ describe '#write' do let(:telemetry) do - instance_double(Datadog::Statsd::Telemetry, sent: true, dropped: true) + instance_double(Datadog::Statsd::Telemetry, sent: true, dropped_writer: true) end it 'builds the socket in the right mode' do @@ -142,9 +142,9 @@ expect(log.string).to match 'Statsd: Datadog::Statsd::UDSConnection::BadSocketError Errno::ECONNRESET: Connection reset by peer' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -171,9 +171,9 @@ expect(log.string).to match 'Statsd: RuntimeError yolo' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -199,9 +199,9 @@ expect(log.string).to match 'Statsd: SocketError yolo' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -248,9 +248,9 @@ expect(log.string).to match 'Statsd: Datadog::Statsd::UDSConnection::BadSocketError Errno::ECONNREFUSED: Connection refused - closed stream' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -277,9 +277,9 @@ expect(log.string).to match 'Statsd: RuntimeError yolo' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -305,9 +305,9 @@ expect(log.string).to match 'Errno::ECONNREFUSED Connection refused - yolo' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -351,9 +351,9 @@ expect(log.string).to match 'Statsd: Errno::ENOENT No such file or directory' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -398,9 +398,9 @@ expect(log.string).to match 'Statsd: IO::EAGAINWaitWritable Resource temporarily unavailable' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') diff --git a/spec/statsd_spec.rb b/spec/statsd_spec.rb index bdbf8fb2..6d876a2d 100644 --- a/spec/statsd_spec.rb +++ b/spec/statsd_spec.rb @@ -397,7 +397,7 @@ subject.gauge('begrutten-suffusion', -107.3) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry 'begrutten-suffusion:-107.3|g', bytes_sent: 697, packets_sent: 1 + expect(socket.recv[0]).to eq_with_telemetry 'begrutten-suffusion:-107.3|g', bytes_sent: 909, packets_sent: 1 end context 'with a sample rate' do @@ -455,7 +455,7 @@ subject.histogram('ohmy', -107.3) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry 'ohmy:-107.3|h', bytes_sent: 682, packets_sent: 1 + expect(socket.recv[0]).to eq_with_telemetry 'ohmy:-107.3|h', bytes_sent: 894, packets_sent: 1 end context 'with a sample rate' do From 44cff4e3cb6445e5e375b8fcb459b53c8dfaf8d6 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Mon, 27 Dec 2021 18:43:18 +0000 Subject: [PATCH 05/19] add dropped_queue telemetry --- lib/datadog/statsd/telemetry.rb | 13 ++++ spec/integrations/allocation_spec.rb | 94 ++++++++++++++-------------- spec/integrations/buffering_spec.rb | 8 +-- spec/integrations/telemetry_spec.rb | 30 ++++----- spec/matchers/telemetry_matcher.rb | 4 ++ spec/statsd/telemetry_spec.rb | 56 +++++++++++++++-- spec/statsd_spec.rb | 4 +- 7 files changed, 136 insertions(+), 73 deletions(-) diff --git a/lib/datadog/statsd/telemetry.rb b/lib/datadog/statsd/telemetry.rb index 6615091b..89b19357 100644 --- a/lib/datadog/statsd/telemetry.rb +++ b/lib/datadog/statsd/telemetry.rb @@ -9,9 +9,11 @@ class Telemetry attr_reader :service_checks attr_reader :bytes_sent attr_reader :bytes_dropped + attr_reader :bytes_dropped_queue attr_reader :bytes_dropped_writer attr_reader :packets_sent attr_reader :packets_dropped + attr_reader :packets_dropped_queue attr_reader :packets_dropped_writer # Rough estimation of maximum telemetry message size without tags @@ -42,9 +44,11 @@ def reset @service_checks = 0 @bytes_sent = 0 @bytes_dropped = 0 + @bytes_dropped_queue = 0 @bytes_dropped_writer = 0 @packets_sent = 0 @packets_dropped = 0 + @packets_dropped_queue = 0 @packets_dropped_writer = 0 @next_flush_time = now_in_s + @flush_interval end @@ -58,6 +62,13 @@ def sent(metrics: 0, events: 0, service_checks: 0, bytes: 0, packets: 0) @packets_sent += packets end + def dropped_queue(bytes: 0, packets: 0) + @bytes_dropped += bytes + @bytes_dropped_queue += bytes + @packets_dropped += packets + @packets_dropped_queue += packets + end + def dropped_writer(bytes: 0, packets: 0) @bytes_dropped += bytes @bytes_dropped_writer += bytes @@ -76,9 +87,11 @@ def flush sprintf(pattern, 'service_checks', @service_checks), sprintf(pattern, 'bytes_sent', @bytes_sent), sprintf(pattern, 'bytes_dropped', @bytes_dropped), + sprintf(pattern, 'bytes_dropped_queue', @bytes_dropped_queue), sprintf(pattern, 'bytes_dropped_writer', @bytes_dropped_writer), sprintf(pattern, 'packets_sent', @packets_sent), sprintf(pattern, 'packets_dropped', @packets_dropped), + sprintf(pattern, 'packets_dropped_queue', @packets_dropped_queue), sprintf(pattern, 'packets_dropped_writer', @packets_dropped_writer), ] end diff --git a/spec/integrations/allocation_spec.rb b/spec/integrations/allocation_spec.rb index 11361b15..8751e6e7 100644 --- a/spec/integrations/allocation_spec.rb +++ b/spec/integrations/allocation_spec.rb @@ -44,11 +44,11 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 18 + 20 elsif RUBY_VERSION < '2.5.0' - 17 + 19 else - 16 + 18 end end @@ -72,13 +72,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 10 + 12 elsif RUBY_VERSION < '2.5.0' - 9 + 11 elsif RUBY_VERSION < '2.6.0' - 8 + 10 else - 7 + 9 end end @@ -93,13 +93,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 27 + 29 elsif RUBY_VERSION < '2.5.0' - 25 + 27 elsif RUBY_VERSION < '2.6.0' - 24 + 26 else - 23 + 25 end end @@ -121,13 +121,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 18 + 20 elsif RUBY_VERSION < '2.5.0' - 17 + 19 elsif RUBY_VERSION < '2.6.0' - 16 + 18 else - 15 + 17 end end @@ -151,13 +151,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 10 + 12 elsif RUBY_VERSION < '2.5.0' - 9 + 11 elsif RUBY_VERSION < '2.6.0' - 8 + 10 else - 7 + 9 end end @@ -172,13 +172,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 27 + 29 elsif RUBY_VERSION < '2.5.0' - 25 + 27 elsif RUBY_VERSION < '2.6.0' - 24 + 26 else - 23 + 25 end end @@ -200,13 +200,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 20 + 22 elsif RUBY_VERSION < '2.5.0' - 19 + 21 elsif RUBY_VERSION < '2.6.0' - 18 + 20 else - 17 + 19 end end @@ -230,13 +230,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 12 + 14 elsif RUBY_VERSION < '2.5.0' - 11 + 13 elsif RUBY_VERSION < '2.6.0' - 10 + 12 else - 9 + 11 end end @@ -251,13 +251,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 29 + 31 elsif RUBY_VERSION < '2.5.0' - 27 + 29 elsif RUBY_VERSION < '2.6.0' - 26 + 28 else - 25 + 27 end end @@ -279,13 +279,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 16 + 18 elsif RUBY_VERSION < '2.5.0' - 15 + 17 elsif RUBY_VERSION < '2.6.0' - 14 + 16 else - 13 + 15 end end @@ -309,13 +309,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 8 + 10 elsif RUBY_VERSION < '2.5.0' - 7 + 9 elsif RUBY_VERSION < '2.6.0' - 6 + 8 else - 5 + 7 end end @@ -330,13 +330,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 25 + 27 elsif RUBY_VERSION < '2.5.0' - 23 + 25 elsif RUBY_VERSION < '2.6.0' - 22 + 24 else - 21 + 23 end end diff --git a/spec/integrations/buffering_spec.rb b/spec/integrations/buffering_spec.rb index e89800d6..2b767010 100644 --- a/spec/integrations/buffering_spec.rb +++ b/spec/integrations/buffering_spec.rb @@ -141,7 +141,7 @@ end let(:buffer_max_pool_size) do - 11 # enough messages to include the telemetry + 13 # enough messages to include the telemetry end it 'increments telemetry correctly' do @@ -156,7 +156,7 @@ subject.flush(flush_telemetry: true, sync: true) - expect(socket.recv[0]).to eq_with_telemetry('mycounter:1|c', bytes_sent: 914, packets_sent: 1, metrics: 1) + expect(socket.recv[0]).to eq_with_telemetry('mycounter:1|c', bytes_sent: 1124, packets_sent: 1, metrics: 1) subject.increment('myothercounter') @@ -166,7 +166,7 @@ subject.sync_with_outbound_io - expect(socket.recv[0]).to eq_with_telemetry('myothercounter:1|c', bytes_sent: 899, packets_sent: 1, metrics: 1) + expect(socket.recv[0]).to eq_with_telemetry('myothercounter:1|c', bytes_sent: 1110, packets_sent: 1, metrics: 1) # last value is still buffered expect(socket.recv).to be_nil end @@ -199,7 +199,7 @@ expect(subject.telemetry.service_checks).to eq 0 expect(subject.telemetry.events).to eq 0 expect(subject.telemetry.packets_sent).to eq 1 - expect(subject.telemetry.bytes_sent).to eq 978 + expect(subject.telemetry.bytes_sent).to eq 1188 end end end diff --git a/spec/integrations/telemetry_spec.rb b/spec/integrations/telemetry_spec.rb index e993ed89..0ebfa95d 100644 --- a/spec/integrations/telemetry_spec.rb +++ b/spec/integrations/telemetry_spec.rb @@ -92,35 +92,35 @@ subject.decrement('test', 1) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:-1|c', metrics: 1, packets_sent: 1, bytes_sent: 892) + expect(socket.recv[0]).to eq_with_telemetry('test:-1|c', metrics: 1, packets_sent: 1, bytes_sent: 1102) subject.count('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|c', metrics: 1, packets_sent: 1, bytes_sent: 895) + expect(socket.recv[0]).to eq_with_telemetry('test:21|c', metrics: 1, packets_sent: 1, bytes_sent: 1106) subject.gauge('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|g', metrics: 1, packets_sent: 1, bytes_sent: 895) + expect(socket.recv[0]).to eq_with_telemetry('test:21|g', metrics: 1, packets_sent: 1, bytes_sent: 1106) subject.histogram('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|h', metrics: 1, packets_sent: 1, bytes_sent: 895) + expect(socket.recv[0]).to eq_with_telemetry('test:21|h', metrics: 1, packets_sent: 1, bytes_sent: 1106) subject.timing('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|ms', metrics: 1, packets_sent: 1, bytes_sent: 895) + expect(socket.recv[0]).to eq_with_telemetry('test:21|ms', metrics: 1, packets_sent: 1, bytes_sent: 1106) subject.set('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|s', metrics: 1, packets_sent: 1, bytes_sent: 896) + expect(socket.recv[0]).to eq_with_telemetry('test:21|s', metrics: 1, packets_sent: 1, bytes_sent: 1107) subject.service_check('sc', 0) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('_sc|sc|0', metrics: 0, service_checks: 1, packets_sent: 1, bytes_sent: 895) + expect(socket.recv[0]).to eq_with_telemetry('_sc|sc|0', metrics: 0, service_checks: 1, packets_sent: 1, bytes_sent: 1106) subject.event('ev', 'text') subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('_e{2,4}:ev|text', metrics: 0, events: 1, packets_sent: 1, bytes_sent: 894) + expect(socket.recv[0]).to eq_with_telemetry('_e{2,4}:ev|text', metrics: 0, events: 1, packets_sent: 1, bytes_sent: 1105) end context 'when some data is dropped' do @@ -141,8 +141,8 @@ expect(subject.telemetry.bytes_sent).to eq 0 expect(subject.telemetry.packets_dropped).to eq 2 expect(subject.telemetry.packets_dropped_writer).to eq 2 - expect(subject.telemetry.bytes_dropped).to eq 1776 - expect(subject.telemetry.bytes_dropped_writer).to eq 1776 + expect(subject.telemetry.bytes_dropped).to eq 2196 + expect(subject.telemetry.bytes_dropped_writer).to eq 2196 subject.gauge('test', 21) subject.flush(flush_telemetry: true, sync: true) @@ -154,8 +154,8 @@ expect(subject.telemetry.bytes_sent).to eq 0 expect(subject.telemetry.packets_dropped).to eq 2 expect(subject.telemetry.packets_dropped_writer).to eq 2 - expect(subject.telemetry.bytes_dropped).to eq 1782 - expect(subject.telemetry.bytes_dropped_writer).to eq 1782 + expect(subject.telemetry.bytes_dropped).to eq 2202 + expect(subject.telemetry.bytes_dropped_writer).to eq 2202 #disable network failure socket.error_on_send(nil) @@ -168,14 +168,14 @@ events: 0, packets_dropped: 2, packets_dropped_writer: 2, - bytes_dropped: 1782, - bytes_dropped_writer: 1782) + bytes_dropped: 2202, + bytes_dropped_writer: 2202) expect(subject.telemetry.metrics).to eq 0 expect(subject.telemetry.service_checks).to eq 0 expect(subject.telemetry.events).to eq 0 expect(subject.telemetry.packets_sent).to eq 1 - expect(subject.telemetry.bytes_sent).to eq 899 + expect(subject.telemetry.bytes_sent).to eq 1109 expect(subject.telemetry.packets_dropped).to eq 0 expect(subject.telemetry.packets_dropped_writer).to eq 0 expect(subject.telemetry.bytes_dropped).to eq 0 diff --git a/spec/matchers/telemetry_matcher.rb b/spec/matchers/telemetry_matcher.rb index 796fb511..55a59509 100644 --- a/spec/matchers/telemetry_matcher.rb +++ b/spec/matchers/telemetry_matcher.rb @@ -10,9 +10,11 @@ def add_telemetry(text, service_checks: 0, bytes_sent: 0, bytes_dropped: 0, + bytes_dropped_queue: 0, bytes_dropped_writer: 0, packets_sent: 0, packets_dropped: 0, + packets_dropped_queue: 0, packets_dropped_writer: 0, transport: 'udp') [ @@ -22,9 +24,11 @@ def add_telemetry(text, "datadog.dogstatsd.client.service_checks:#{service_checks}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.bytes_sent:#{bytes_sent}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.bytes_dropped:#{bytes_dropped}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", + "datadog.dogstatsd.client.bytes_dropped_queue:#{bytes_dropped_queue}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.bytes_dropped_writer:#{bytes_dropped_writer}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.packets_sent:#{packets_sent}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.packets_dropped:#{packets_dropped}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", + "datadog.dogstatsd.client.packets_dropped_queue:#{packets_dropped_queue}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.packets_dropped_writer:#{packets_dropped_writer}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", ].join("\n") end diff --git a/spec/statsd/telemetry_spec.rb b/spec/statsd/telemetry_spec.rb index 24b70170..2c8dd7e6 100644 --- a/spec/statsd/telemetry_spec.rb +++ b/spec/statsd/telemetry_spec.rb @@ -78,6 +78,7 @@ before do subject.sent(metrics: 1, events: 2, service_checks: 3, bytes: 4, packets: 5) subject.dropped_writer(bytes: 6, packets: 7) + subject.dropped_queue(bytes: 9, packets: 8) subject.flush end @@ -87,10 +88,12 @@ "datadog.dogstatsd.client.events:2|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.service_checks:3|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.bytes_sent:4|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", - "datadog.dogstatsd.client.bytes_dropped:6|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.bytes_dropped:15|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.bytes_dropped_queue:9|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.bytes_dropped_writer:6|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.packets_sent:5|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", - "datadog.dogstatsd.client.packets_dropped:7|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.packets_dropped:15|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.packets_dropped_queue:8|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.packets_dropped_writer:7|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", ] end @@ -103,7 +106,7 @@ it 'makes only 10 allocations' do expect do subject.flush - end.to make_allocations(10) + end.to make_allocations(12) end end end @@ -115,6 +118,7 @@ subject.sent(metrics: 1, events: 2, service_checks: 3, bytes: 4, packets: 5) subject.dropped_writer(bytes: 6, packets: 7) + subject.dropped_queue(bytes: 9, packets: 7) end after do @@ -163,7 +167,13 @@ it 'resets the bytes_dropped' do expect do subject.reset - end.to change { subject.bytes_dropped}.from(6).to(0) + end.to change { subject.bytes_dropped}.from(15).to(0) + end + + it 'resets the bytes_dropped_queue' do + expect do + subject.reset + end.to change { subject.bytes_dropped_queue }.from(9).to(0) end it 'resets the bytes_dropped_writer' do @@ -175,7 +185,13 @@ it 'resets the packets_dropped' do expect do subject.reset - end.to change { subject.packets_dropped}.from(7).to(0) + end.to change { subject.packets_dropped}.from(14).to(0) + end + + it 'resets the packets_dropped_queue' do + expect do + subject.reset + end.to change { subject.packets_dropped_queue }.from(7).to(0) end it 'resets the packets_dropped_writer' do @@ -227,6 +243,36 @@ end end + describe '#dropped_queue' do + context 'when bumping bytes' do + it 'has bumped bytes_dropped_queue by the right amount' do + expect do + subject.dropped_queue(bytes: 3) + end.to change { subject.bytes_dropped_queue }.from(0).to(3) + end + + it 'has bumped bytes_dropped by the right amount' do + expect do + subject.dropped_queue(bytes: 3) + end.to change { subject.bytes_dropped }.from(0).to(3) + end + end + + context 'when bumping packets' do + it 'has bumped packets_dropped_queue by the right amount' do + expect do + subject.dropped_queue(packets: 3) + end.to change { subject.packets_dropped_queue }.from(0).to(3) + end + + it 'has bumped packets_dropped by the right amount' do + expect do + subject.dropped_queue(packets: 3) + end.to change { subject.packets_dropped }.from(0).to(3) + end + end + end + describe '#dropped_writer' do context 'when bumping bytes' do it 'has bumped bytes_dropped_writer by the right amount' do diff --git a/spec/statsd_spec.rb b/spec/statsd_spec.rb index 6d876a2d..64d87a8c 100644 --- a/spec/statsd_spec.rb +++ b/spec/statsd_spec.rb @@ -397,7 +397,7 @@ subject.gauge('begrutten-suffusion', -107.3) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry 'begrutten-suffusion:-107.3|g', bytes_sent: 909, packets_sent: 1 + expect(socket.recv[0]).to eq_with_telemetry 'begrutten-suffusion:-107.3|g', bytes_sent: 1119, packets_sent: 1 end context 'with a sample rate' do @@ -455,7 +455,7 @@ subject.histogram('ohmy', -107.3) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry 'ohmy:-107.3|h', bytes_sent: 894, packets_sent: 1 + expect(socket.recv[0]).to eq_with_telemetry 'ohmy:-107.3|h', bytes_sent: 1104, packets_sent: 1 end context 'with a sample rate' do From 8dae15ea6eaa3eaacdb581f2077314bd28bdeff9 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Mon, 27 Dec 2021 19:42:25 +0000 Subject: [PATCH 06/19] Add dropped_queue tlm when sender queue is full --- lib/datadog/statsd/forwarder.rb | 2 +- lib/datadog/statsd/sender.rb | 5 +++-- spec/statsd/forwarder_spec.rb | 6 ++++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/datadog/statsd/forwarder.rb b/lib/datadog/statsd/forwarder.rb index 5183dac2..b6b58426 100644 --- a/lib/datadog/statsd/forwarder.rb +++ b/lib/datadog/statsd/forwarder.rb @@ -56,7 +56,7 @@ def initialize( @sender = single_thread ? SingleThreadSender.new(buffer, logger: logger) : - Sender.new(buffer, logger: logger, queue_size: sender_queue_size) + Sender.new(buffer, telemetry: @telemetry, logger: logger, queue_size: sender_queue_size) @sender.start end diff --git a/lib/datadog/statsd/sender.rb b/lib/datadog/statsd/sender.rb index 1a277f4c..d1ca6ec7 100644 --- a/lib/datadog/statsd/sender.rb +++ b/lib/datadog/statsd/sender.rb @@ -12,8 +12,9 @@ class Statsd class Sender CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close) - def initialize(message_buffer, queue_size: 2048, logger: nil) + def initialize(message_buffer, telemetry: nil, queue_size: 2048, logger: nil) @message_buffer = message_buffer + @telemetry = telemetry @queue_size = queue_size @logger = logger @mx = Mutex.new @@ -75,7 +76,7 @@ def add(message) if message_queue.length <= @queue_size message_queue << message else - @logger.debug { "Sender queue full; dropping" } if @logger # TODO: tlm instead of log + @telemetry.dropped_queue(packets: 1, bytes: message.bytesize) if @telemetry end end diff --git a/spec/statsd/forwarder_spec.rb b/spec/statsd/forwarder_spec.rb index 222b7657..bdee42c5 100644 --- a/spec/statsd/forwarder_spec.rb +++ b/spec/statsd/forwarder_spec.rb @@ -107,7 +107,8 @@ .with(message_buffer, logger: logger, queue_size: - Datadog::Statsd::UDP_DEFAULT_SENDER_QUEUE_SIZE) + Datadog::Statsd::UDP_DEFAULT_SENDER_QUEUE_SIZE, + telemetry: telemetry) .exactly(1) subject @@ -287,7 +288,8 @@ .to receive(:new) .with(message_buffer, logger: logger, - queue_size: Datadog::Statsd::UDS_DEFAULT_SENDER_QUEUE_SIZE) + queue_size: Datadog::Statsd::UDS_DEFAULT_SENDER_QUEUE_SIZE, + telemetry: telemetry) .exactly(1) subject From f96c9134ab1417e2c7bf93cc21acc631b5715ee0 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Thu, 6 Jan 2022 15:44:28 +0000 Subject: [PATCH 07/19] add tests for queue_size --- lib/datadog/statsd/sender.rb | 14 +++--- spec/statsd/sender_spec.rb | 94 ++++++++++++++++++++++-------------- 2 files changed, 65 insertions(+), 43 deletions(-) diff --git a/lib/datadog/statsd/sender.rb b/lib/datadog/statsd/sender.rb index d1ca6ec7..68daf810 100644 --- a/lib/datadog/statsd/sender.rb +++ b/lib/datadog/statsd/sender.rb @@ -12,12 +12,14 @@ class Statsd class Sender CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close) - def initialize(message_buffer, telemetry: nil, queue_size: 2048, logger: nil) + def initialize(message_buffer, telemetry: nil, queue_size: 2048, logger: nil, queue_class: Queue, thread_class: Thread) @message_buffer = message_buffer @telemetry = telemetry @queue_size = queue_size @logger = logger @mx = Mutex.new + @queue_class = queue_class + @thread_class = thread_class end def flush(sync: false) @@ -44,7 +46,7 @@ def rendez_vous return unless message_queue # Initialize and get the thread's sync queue - queue = (Thread.current[:statsd_sync_queue] ||= Queue.new) + queue = (@thread_class.current[:statsd_sync_queue] ||= @queue_class.new) # tell sender-thread to notify us in the current # thread's queue message_queue.push(queue) @@ -84,9 +86,9 @@ def start raise ArgumentError, 'Sender already started' if message_queue # initialize a new message queue for the background thread - @message_queue = Queue.new + @message_queue = @queue_class.new # start background thread - @sender_thread = Thread.new(&method(:send_loop)) + @sender_thread = @thread_class.new(&method(:send_loop)) @sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3') end @@ -130,7 +132,7 @@ def send_loop case message when :flush message_buffer.flush - when Queue + when @queue_class message.push(:go_on) else message_buffer.add(message) @@ -152,7 +154,7 @@ def send_loop break when :flush message_buffer.flush - when Queue + when @queue_class message.push(:go_on) else message_buffer.add(message) diff --git a/spec/statsd/sender_spec.rb b/spec/statsd/sender_spec.rb index c8df096a..a9895121 100644 --- a/spec/statsd/sender_spec.rb +++ b/spec/statsd/sender_spec.rb @@ -1,33 +1,27 @@ require 'spec_helper' -class Waiter - def initialize() - @mx = Mutex.new - @cv = ConditionVariable.new - @sig = false - end - - def wait() - @mx.synchronize { @cv.wait(@mx) until @sig } - end - - def signal() - @mx.synchronize { - @sig = true - @cv.signal - } - end -end - describe Datadog::Statsd::Sender do subject do - described_class.new(message_buffer, queue_size: 5) + described_class.new( + message_buffer, + telemetry: telemetry, + queue_size: queue_size, + queue_class: queue_class, + thread_class: thread_class) end + let(:queue_size) { 5 } + let(:queue_class) { Queue } + let(:thread_class) { Thread } + let(:message_buffer) do instance_double(Datadog::Statsd::MessageBuffer) end + let(:telemetry) do + instance_double(Datadog::Statsd::Telemetry) + end + describe '#start' do after do subject.stop @@ -107,28 +101,54 @@ def signal() subject.rendez_vous end - it 'adds only messages up to queue_size messages' do - # keep the sender thread busy handling a flush - waiter = Waiter.new - expect(message_buffer) - .to receive(:flush) { waiter.wait } - subject.flush + context 'with fake queue and fake sender thread' do + let(:fake_queue) do + if Queue.instance_methods.include?(:close) + instance_double(Queue, { "length" => fake_queue_length, "<<" => true, "close" => true }) + else + instance_double(Queue, { "length" => fake_queue_length, "<<" => true }) + end + end - # send six messages; sixth is dropped - for i in 0..6 do - subject.add('message') + let(:queue_class) do + class_double(Queue, new: fake_queue) end - expect(message_buffer) - .to receive(:add) - .with('message') - .exactly(5).times + let(:thread_class) do + if Thread.instance_methods.include?(:name=) + fake_thread = instance_double(Thread, { "alive?" => true, "name=" => true, "join" => true }) + else + fake_thread = instance_double(Thread, { "alive?" => true, "join" => true }) + end + class_double(Thread, new: fake_thread) + end - # resume the sender thread again to receive those six - # messages - waiter.signal + context 'with fewer messages in queue than queue_size' do + let(:fake_queue_length) { queue_size } + + it 'adds only messages up to queue_size messages' do + expect(fake_queue).to receive(:<<).with('message') + if not Queue.instance_methods.include?(:close) + expect(fake_queue).to receive(:<<).with(:close) + end + expect(telemetry).not_to receive(:dropped_queue) + subject.add('message') + end + end - subject.rendez_vous + context 'with more messages in queue than queue_size' do + let(:fake_queue_length) { queue_size + 1 } + + it 'adds only messages up to queue_size messages' do + if Queue.instance_methods.include?(:close) + expect(fake_queue).not_to receive(:<<) + else + expect(fake_queue).to receive(:<<).with(:close) + end + expect(telemetry).to receive(:dropped_queue).with(bytes: 7, packets: 1) + subject.add('message') + end + end end end end From 1edb0e7e2a19b65e930459227debb3988a799b47 Mon Sep 17 00:00:00 2001 From: abicky Date: Sun, 16 Jan 2022 19:43:03 +0900 Subject: [PATCH 08/19] Add option "buffer_flush_interval" If metrics are recorded infrequently, it might take much time until the message buffer flushes them and we can see them on Datadog. This option is useful in such a case. --- lib/datadog/statsd.rb | 4 ++ lib/datadog/statsd/forwarder.rb | 3 +- lib/datadog/statsd/sender.rb | 9 +++- lib/datadog/statsd/single_thread_sender.rb | 10 ++-- lib/datadog/statsd/timer.rb | 59 ++++++++++++++++++++++ spec/spec_helper.rb | 1 + spec/statsd/forwarder_spec.rb | 10 +++- spec/statsd/sender_spec.rb | 45 +++++++++++++++-- spec/statsd/single_thread_sender_spec.rb | 41 ++++++++++++++- spec/statsd/timer_spec.rb | 50 ++++++++++++++++++ 10 files changed, 219 insertions(+), 13 deletions(-) create mode 100644 lib/datadog/statsd/timer.rb create mode 100644 spec/statsd/timer_spec.rb diff --git a/lib/datadog/statsd.rb b/lib/datadog/statsd.rb index 90db0d84..6c8c3861 100644 --- a/lib/datadog/statsd.rb +++ b/lib/datadog/statsd.rb @@ -11,6 +11,7 @@ require_relative 'statsd/sender' require_relative 'statsd/single_thread_sender' require_relative 'statsd/forwarder' +require_relative 'statsd/timer' $deprecation_message_mutex = Mutex.new $deprecation_message_done = false @@ -73,6 +74,7 @@ def tags # @option [Logger] logger for debugging # @option [Integer] buffer_max_payload_size max bytes to buffer # @option [Integer] buffer_max_pool_size max messages to buffer + # @option [Numeric] buffer_flush_interval interval in second to flush buffer # @option [String] socket_path unix socket path # @option [Float] default sample rate if not overridden # @option [Boolean] single_thread flushes the metrics on the main thread instead of in a companion thread @@ -88,6 +90,7 @@ def initialize( buffer_max_payload_size: nil, buffer_max_pool_size: nil, buffer_overflowing_stategy: :drop, + buffer_flush_interval: nil, logger: nil, @@ -133,6 +136,7 @@ def initialize( buffer_max_payload_size: buffer_max_payload_size, buffer_max_pool_size: buffer_max_pool_size, buffer_overflowing_stategy: buffer_overflowing_stategy, + buffer_flush_interval: buffer_flush_interval, telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil, ) diff --git a/lib/datadog/statsd/forwarder.rb b/lib/datadog/statsd/forwarder.rb index b56ab3a8..0d3c2852 100644 --- a/lib/datadog/statsd/forwarder.rb +++ b/lib/datadog/statsd/forwarder.rb @@ -12,6 +12,7 @@ def initialize( buffer_max_payload_size: nil, buffer_max_pool_size: nil, buffer_overflowing_stategy: :drop, + buffer_flush_interval: nil, telemetry_flush_interval: nil, global_tags: [], @@ -48,7 +49,7 @@ def initialize( max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE, overflowing_stategy: buffer_overflowing_stategy, ) - @sender = (single_thread ? SingleThreadSender : Sender).new(buffer, logger: logger) + @sender = (single_thread ? SingleThreadSender : Sender).new(buffer, logger: logger, flush_interval: buffer_flush_interval) @sender.start end diff --git a/lib/datadog/statsd/sender.rb b/lib/datadog/statsd/sender.rb index aa28320e..e96a11d1 100644 --- a/lib/datadog/statsd/sender.rb +++ b/lib/datadog/statsd/sender.rb @@ -12,10 +12,13 @@ class Statsd class Sender CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close) - def initialize(message_buffer, logger: nil) + def initialize(message_buffer, logger: nil, flush_interval: nil) @message_buffer = message_buffer @logger = logger @mx = Mutex.new + if flush_interval + @flush_timer = Datadog::Statsd::Timer.new(flush_interval) { flush(sync: true) } + end end def flush(sync: false) @@ -68,6 +71,7 @@ def add(message) @message_queue = nil message_buffer.reset start + @flush_timer.start if @flush_timer && @flush_timer.stop? } end @@ -82,6 +86,7 @@ def start # start background thread @sender_thread = Thread.new(&method(:send_loop)) @sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3') + @flush_timer.start if @flush_timer end if CLOSEABLE_QUEUES @@ -92,6 +97,7 @@ def stop(join_worker: true) message_queue = @message_queue message_queue.close if message_queue + @flush_timer.stop if @flush_timer sender_thread = @sender_thread sender_thread.join if sender_thread && join_worker end @@ -103,6 +109,7 @@ def stop(join_worker: true) message_queue = @message_queue message_queue << :close if message_queue + @flush_timer.stop if @flush_timer sender_thread = @sender_thread sender_thread.join if sender_thread && join_worker end diff --git a/lib/datadog/statsd/single_thread_sender.rb b/lib/datadog/statsd/single_thread_sender.rb index d82dc676..fb64ad37 100644 --- a/lib/datadog/statsd/single_thread_sender.rb +++ b/lib/datadog/statsd/single_thread_sender.rb @@ -7,10 +7,13 @@ class Statsd # It is using current Process.PID to check it is the result of a recent fork # and it is reseting the MessageBuffer if that's the case. class SingleThreadSender - def initialize(message_buffer, logger: nil) + def initialize(message_buffer, logger: nil, flush_interval: nil) @message_buffer = message_buffer @logger = logger @mx = Mutex.new + if flush_interval + @flush_timer = Datadog::Statsd::Timer.new(flush_interval) { flush } + end # store the pid for which this sender has been created update_fork_pid end @@ -21,6 +24,7 @@ def add(message) # not send, they belong to the parent process, let's clear the buffer. if forked? @message_buffer.reset + @flush_timer.start if @flush_timer && @flush_timer.stop? update_fork_pid end @message_buffer.add(message) @@ -33,12 +37,12 @@ def flush(*) } end - # Compatibility with `Sender` def start() + @flush_timer.start if @flush_timer end - # Compatibility with `Sender` def stop() + @flush_timer.stop if @flush_timer end # Compatibility with `Sender` diff --git a/lib/datadog/statsd/timer.rb b/lib/datadog/statsd/timer.rb new file mode 100644 index 00000000..fb230410 --- /dev/null +++ b/lib/datadog/statsd/timer.rb @@ -0,0 +1,59 @@ +# frozen_string_literal: true + +module Datadog + class Statsd + class Timer + def initialize(interval, &callback) + @mx = Mutex.new + @cv = ConditionVariable.new + @interval = interval + @callback = callback + @stop = true + end + + def start + return unless stop? + + @stop = false + @thread = Thread.new do + last_execution_time = current_time + @mx.synchronize do + until @stop + @cv.wait(@mx, @interval - (current_time - last_execution_time)) + last_execution_time = current_time + @callback.call + end + end + end + @thread.name = 'Statsd Timer' unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3') + end + + def stop + return if @thread.nil? + + @stop = true + @mx.synchronize do + @cv.signal + end + @thread.join + @thread = nil + end + + def stop? + @thread.nil? || @thread.stop? + end + + private + + if Process.const_defined?(:CLOCK_MONOTONIC) + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + else + def current_time + Time.now + end + end + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9948ce71..cdba6bd3 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -3,6 +3,7 @@ require 'rspec' require 'rspec/its' require 'byebug' +require 'timeout' require 'timecop' require 'stringio' require 'logger' diff --git a/spec/statsd/forwarder_spec.rb b/spec/statsd/forwarder_spec.rb index 31492754..fff05ae4 100644 --- a/spec/statsd/forwarder_spec.rb +++ b/spec/statsd/forwarder_spec.rb @@ -17,6 +17,10 @@ :anything end + let(:buffer_flush_interval) do + 15 + end + let(:telemetry_flush_interval) do 42 end @@ -84,6 +88,7 @@ buffer_max_payload_size: buffer_max_payload_size, buffer_max_pool_size: buffer_max_pool_size, buffer_overflowing_stategy: buffer_overflowing_stategy, + buffer_flush_interval: buffer_flush_interval, telemetry_flush_interval: telemetry_flush_interval, @@ -104,7 +109,7 @@ it 'builds the sender' do expect(Datadog::Statsd::Sender) .to receive(:new) - .with(message_buffer, logger: logger) + .with(message_buffer, logger: logger, flush_interval: flush_interval) .exactly(1) subject @@ -262,6 +267,7 @@ buffer_max_payload_size: buffer_max_payload_size, buffer_max_pool_size: buffer_max_pool_size, buffer_overflowing_stategy: buffer_overflowing_stategy, + buffer_flush_interval: buffer_flush_interval, telemetry_flush_interval: telemetry_flush_interval, @@ -282,7 +288,7 @@ it 'builds the sender' do expect(Datadog::Statsd::Sender) .to receive(:new) - .with(message_buffer, logger: logger) + .with(message_buffer, logger: logger, flush_interval: flush_interval) .exactly(1) subject diff --git a/spec/statsd/sender_spec.rb b/spec/statsd/sender_spec.rb index ccf66bd5..635b122d 100644 --- a/spec/statsd/sender_spec.rb +++ b/spec/statsd/sender_spec.rb @@ -2,12 +2,13 @@ describe Datadog::Statsd::Sender do subject do - described_class.new(message_buffer) + described_class.new(message_buffer, flush_interval: flush_interval) end let(:message_buffer) do instance_double(Datadog::Statsd::MessageBuffer) end + let(:flush_interval) { nil } describe '#start' do after do @@ -46,6 +47,28 @@ end.to raise_error(ArgumentError, /Sender already started/) end end + + context 'when flush_interval is set' do + let(:flush_interval) { 0.001 } + + it 'starts a worker thread and a flush timer thread' do + mutex = Mutex.new + cv = ConditionVariable.new + expect(subject).to receive(:flush).and_wrap_original do + mutex.synchronize { cv.broadcast } + end + + expect do + subject.start + end.to change { Thread.list.size }.by(2) + + # wait a second or until #flush is called + mutex.synchronize { cv.wait(mutex, 1) } + + # subject.stop calls #flush + allow(subject).to receive(:flush) + end + end end describe '#stop' do @@ -53,10 +76,22 @@ subject.start end - it 'stops the worker thread' do - expect do - subject.stop - end.to change { Thread.list.size }.by(-1) + context 'when flush_interval is not set' do + it 'stops the worker thread' do + expect do + subject.stop + end.to change { Thread.list.size }.by(-1) + end + end + + context 'when flush_interval is set' do + let(:flush_interval) { 15 } + + it 'stops the worker thread and the flush timer thread' do + expect do + subject.stop + end.to change { Thread.list.size }.by(-2) + end end end diff --git a/spec/statsd/single_thread_sender_spec.rb b/spec/statsd/single_thread_sender_spec.rb index 201df81c..7d2400ba 100644 --- a/spec/statsd/single_thread_sender_spec.rb +++ b/spec/statsd/single_thread_sender_spec.rb @@ -2,12 +2,13 @@ describe Datadog::Statsd::SingleThreadSender do subject do - described_class.new(message_buffer) + described_class.new(message_buffer, flush_interval: flush_interval) end let(:message_buffer) do instance_double(Datadog::Statsd::MessageBuffer) end + let(:flush_interval) { nil } describe '#start' do after do @@ -19,6 +20,44 @@ subject.start end.to change { Thread.list.size }.by(0) end + + context 'when flush_interval is set' do + let(:flush_interval) { 0.001 } + + it 'starts flush timer thread' do + mutex = Mutex.new + cv = ConditionVariable.new + expect(subject).to receive(:flush).and_wrap_original do + mutex.synchronize { cv.broadcast } + end + + expect do + subject.start + end.to change { Thread.list.size }.by(1) + + # wait a second or until #flush is called + mutex.synchronize { cv.wait(mutex, 1) } + + # subject.stop calls #flush + allow(subject).to receive(:flush) + end + end + end + + describe '#stop' do + before do + subject.start + end + + context 'when flush_interval is set' do + let(:flush_interval) { 15 } + + it 'stops the flush timer thread' do + expect do + subject.stop + end.to change { Thread.list.size }.by(-1) + end + end end describe '#add' do diff --git a/spec/statsd/timer_spec.rb b/spec/statsd/timer_spec.rb new file mode 100644 index 00000000..882b5c97 --- /dev/null +++ b/spec/statsd/timer_spec.rb @@ -0,0 +1,50 @@ +require 'spec_helper' + +describe Datadog::Statsd::Timer do + subject do + described_class.new(interval) do + queue << Time.now + end + end + let(:queue) { Queue.new } + + describe '#start' do + let(:interval) { 0.001 } + + after do + subject.stop + end + + it 'starts the timer thread and calls the callback' do + expect do + subject.start + end.to change { Thread.list.size }.by(1) + + # the callback should be called in a short time + Timeout.timeout(1) do + queue.pop + end + end + end + + describe '#stop' do + let(:interval) { 15 } + + before do + subject.start + # sleep a little for the thread to call ConditionVariable#wait + sleep 0.000001 + end + + it 'stops the timer thread after calling the callback' do + expect do + # the timer should call the callback immediatelly, that is, without waiting the interval + Timeout.timeout(1) do + subject.stop + end + end.to change { Thread.list.size }.by(-1) + + expect(queue).not_to be_empty + end + end +end From ef0db0a25ededee01171e86fef38d2d821d5ba60 Mon Sep 17 00:00:00 2001 From: abicky Date: Tue, 18 Jan 2022 01:45:53 +0900 Subject: [PATCH 09/19] Consider case when callback of timer takes more than interval --- lib/datadog/statsd/timer.rb | 3 ++- spec/statsd/timer_spec.rb | 39 ++++++++++++++++++++++++++++++++----- 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/lib/datadog/statsd/timer.rb b/lib/datadog/statsd/timer.rb index fb230410..8c1c8940 100644 --- a/lib/datadog/statsd/timer.rb +++ b/lib/datadog/statsd/timer.rb @@ -19,7 +19,8 @@ def start last_execution_time = current_time @mx.synchronize do until @stop - @cv.wait(@mx, @interval - (current_time - last_execution_time)) + timeout = @interval - (current_time - last_execution_time) + @cv.wait(@mx, timeout > 0 ? timeout : 0) last_execution_time = current_time @callback.call end diff --git a/spec/statsd/timer_spec.rb b/spec/statsd/timer_spec.rb index 882b5c97..6750e95c 100644 --- a/spec/statsd/timer_spec.rb +++ b/spec/statsd/timer_spec.rb @@ -3,14 +3,35 @@ describe Datadog::Statsd::Timer do subject do described_class.new(interval) do - queue << Time.now + sleep callback_durations.next + call_times << Time.now + end + end + let(:call_times) { Queue.new } + let(:callback_durations) do + Enumerator.new do |y| + loop do + y << 0 + end end end - let(:queue) { Queue.new } describe '#start' do let(:interval) { 0.001 } + # this callback_durations allows calls at an odd number of times + # to take almost no time, whereas calls at an even number of times + # to take at least `interval` seconds. + let(:callback_durations) do + Enumerator.new do |y| + loop do + [0, interval].each do |d| + y << d + end + end + end + end + after do subject.stop end @@ -20,9 +41,17 @@ subject.start end.to change { Thread.list.size }.by(1) - # the callback should be called in a short time + # use timeout just in case call_times.pop waits forever Timeout.timeout(1) do - queue.pop + # the first call is made after `interval` seconds + first_call_time = call_times.pop + # the second call is made `interval` seconds after the first call + # and it takes `interval` seconds before Time.now is called + second_call_time = call_times.pop + # the third call is made immediatelly after the second call + third_call_time = call_times.pop + expect(second_call_time - first_call_time).to be_within(0.001).of(interval * 2) + expect(third_call_time - second_call_time).to be_within(0.001).of(0) end end end @@ -44,7 +73,7 @@ end end.to change { Thread.list.size }.by(-1) - expect(queue).not_to be_empty + expect(call_times).not_to be_empty end end end From 657623aea63e5edcd523ca532c44af3e9a9c7abf Mon Sep 17 00:00:00 2001 From: abicky Date: Wed, 19 Jan 2022 05:03:57 +0900 Subject: [PATCH 10/19] Fix test of Datadog::Statsd::Sender#start In the test, even if `flush` isn't called, it still succeeds because `stop` calls `flush`. This commit makes the test work expectedly. --- spec/statsd/sender_spec.rb | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/spec/statsd/sender_spec.rb b/spec/statsd/sender_spec.rb index 635b122d..6d35d60c 100644 --- a/spec/statsd/sender_spec.rb +++ b/spec/statsd/sender_spec.rb @@ -54,8 +54,16 @@ it 'starts a worker thread and a flush timer thread' do mutex = Mutex.new cv = ConditionVariable.new - expect(subject).to receive(:flush).and_wrap_original do - mutex.synchronize { cv.broadcast } + flush_called = false + + # #flush can be called multiple times before #stop is called. + # It is also called in #stop, which is executed in the after callback, + # so "expect(subject).to receive(:flush).at_least(:once)" doesn't work. + allow(subject).to receive(:flush) do + mutex.synchronize do + flush_called = true + cv.broadcast + end end expect do @@ -63,10 +71,11 @@ end.to change { Thread.list.size }.by(2) # wait a second or until #flush is called - mutex.synchronize { cv.wait(mutex, 1) } + mutex.synchronize do + cv.wait(mutex, 1) unless flush_called + end - # subject.stop calls #flush - allow(subject).to receive(:flush) + expect(flush_called).to be true end end end From 47f6cdf9f87eb7e1f01cb0cf27e91777567d10d6 Mon Sep 17 00:00:00 2001 From: abicky Date: Wed, 19 Jan 2022 05:12:26 +0900 Subject: [PATCH 11/19] Increase delta of be_within just to be safe 1ms is so short that the test might sometimes fail. --- spec/statsd/timer_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/statsd/timer_spec.rb b/spec/statsd/timer_spec.rb index 6750e95c..962e4c66 100644 --- a/spec/statsd/timer_spec.rb +++ b/spec/statsd/timer_spec.rb @@ -50,8 +50,8 @@ second_call_time = call_times.pop # the third call is made immediatelly after the second call third_call_time = call_times.pop - expect(second_call_time - first_call_time).to be_within(0.001).of(interval * 2) - expect(third_call_time - second_call_time).to be_within(0.001).of(0) + expect(second_call_time - first_call_time).to be_within(0.01).of(interval * 2) + expect(third_call_time - second_call_time).to be_within(0.01).of(0) end end end From 74986b3bf34e94b09c8ba7dbd11370cdca848396 Mon Sep 17 00:00:00 2001 From: abicky Date: Wed, 19 Jan 2022 05:23:25 +0900 Subject: [PATCH 12/19] Fix test of Datadog::Statsd::SingleThreadSender#start For the same reason as 657623aea63e5edcd523ca532c44af3e9a9c7abf. --- spec/statsd/single_thread_sender_spec.rb | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/spec/statsd/single_thread_sender_spec.rb b/spec/statsd/single_thread_sender_spec.rb index 7d2400ba..abde3496 100644 --- a/spec/statsd/single_thread_sender_spec.rb +++ b/spec/statsd/single_thread_sender_spec.rb @@ -27,8 +27,16 @@ it 'starts flush timer thread' do mutex = Mutex.new cv = ConditionVariable.new - expect(subject).to receive(:flush).and_wrap_original do - mutex.synchronize { cv.broadcast } + flush_called = false + + # #flush can be called multiple times before #stop is called. + # It is also called in #stop, which is executed in the after callback, + # so "expect(subject).to receive(:flush).at_least(:once)" doesn't work. + allow(subject).to receive(:flush) do + mutex.synchronize do + flush_called = true + cv.broadcast + end end expect do @@ -36,10 +44,11 @@ end.to change { Thread.list.size }.by(1) # wait a second or until #flush is called - mutex.synchronize { cv.wait(mutex, 1) } + mutex.synchronize do + cv.wait(mutex, 1) unless flush_called + end - # subject.stop calls #flush - allow(subject).to receive(:flush) + expect(flush_called).to be true end end end From c505586d77d14899d7d345142f31388a25e99a8a Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 19 Jan 2022 17:17:57 +0000 Subject: [PATCH 13/19] mention that buffer_flush_interval results in a thread --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 2ccca9ee..90afdab3 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,8 @@ There are three different kinds of messages: There is also an implicit message which closes the queue which will cause the sender thread to finish processing and exit. +The `buffer_flush_interval`, if enabled, is implemented with an additional thread which manages the timing of those flushes. This additional thread is used even if `single_thread: true`. + ### Usual workflow You push metrics to the statsd client which writes them quickly to the sender message queue. The sender thread receives those message, buffers them and flushes them to the connection when the buffer limit is reached. From 73e7b52786497e8dc869b4d18e82c1da5a6d49b6 Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Thu, 20 Jan 2022 10:24:29 +0100 Subject: [PATCH 14/19] specs: typo in the forwarder_spec specs. --- spec/statsd/forwarder_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/statsd/forwarder_spec.rb b/spec/statsd/forwarder_spec.rb index fff05ae4..aafcdf51 100644 --- a/spec/statsd/forwarder_spec.rb +++ b/spec/statsd/forwarder_spec.rb @@ -109,7 +109,7 @@ it 'builds the sender' do expect(Datadog::Statsd::Sender) .to receive(:new) - .with(message_buffer, logger: logger, flush_interval: flush_interval) + .with(message_buffer, logger: logger, flush_interval: buffer_flush_interval) .exactly(1) subject @@ -288,7 +288,7 @@ it 'builds the sender' do expect(Datadog::Statsd::Sender) .to receive(:new) - .with(message_buffer, logger: logger, flush_interval: flush_interval) + .with(message_buffer, logger: logger, flush_interval: buffer_flush_interval) .exactly(1) subject From 7d752c8ac803e0449f4b44f6ddd1c463fe973801 Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Thu, 20 Jan 2022 10:25:12 +0100 Subject: [PATCH 15/19] specs: delay in the timer_spec seems to be too short for slow configuration. I had this unit tests failing on my laptop. --- spec/statsd/timer_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/statsd/timer_spec.rb b/spec/statsd/timer_spec.rb index 962e4c66..4295ec82 100644 --- a/spec/statsd/timer_spec.rb +++ b/spec/statsd/timer_spec.rb @@ -62,7 +62,7 @@ before do subject.start # sleep a little for the thread to call ConditionVariable#wait - sleep 0.000001 + sleep 0.01 end it 'stops the timer thread after calling the callback' do From 30770cffe2c8e53868f2c05bb703227b75e3f75a Mon Sep 17 00:00:00 2001 From: Remy Mathieu Date: Thu, 20 Jan 2022 11:10:27 +0100 Subject: [PATCH 16/19] specs: a `:flush` can be triggered. --- spec/statsd/single_thread_sender_spec.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/spec/statsd/single_thread_sender_spec.rb b/spec/statsd/single_thread_sender_spec.rb index abde3496..1d63b9b0 100644 --- a/spec/statsd/single_thread_sender_spec.rb +++ b/spec/statsd/single_thread_sender_spec.rb @@ -62,6 +62,7 @@ let(:flush_interval) { 15 } it 'stops the flush timer thread' do + allow(subject).to receive(:flush) expect do subject.stop end.to change { Thread.list.size }.by(-1) From 1cb07a6b012c2b42e7942f1be791e16e1cbd0425 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Tue, 1 Feb 2022 14:54:47 +0000 Subject: [PATCH 17/19] Use a constant for the default queue_size value --- lib/datadog/statsd/sender.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/datadog/statsd/sender.rb b/lib/datadog/statsd/sender.rb index 6f8e3e28..ede41980 100644 --- a/lib/datadog/statsd/sender.rb +++ b/lib/datadog/statsd/sender.rb @@ -12,7 +12,7 @@ class Statsd class Sender CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close) - def initialize(message_buffer, telemetry: nil, queue_size: 2048, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread) + def initialize(message_buffer, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SIZE, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread) @message_buffer = message_buffer @telemetry = telemetry @queue_size = queue_size From a528562288c4a8cd8c80f3b5e92432e68e1500b0 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Tue, 1 Feb 2022 14:55:06 +0000 Subject: [PATCH 18/19] Simplify allocation test --- spec/statsd/telemetry_spec.rb | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/spec/statsd/telemetry_spec.rb b/spec/statsd/telemetry_spec.rb index 2c8dd7e6..55f21e58 100644 --- a/spec/statsd/telemetry_spec.rb +++ b/spec/statsd/telemetry_spec.rb @@ -98,16 +98,11 @@ ] end - context do - before do - skip 'Ruby too old' if RUBY_VERSION < '2.3.0' - end - - it 'makes only 10 allocations' do - expect do - subject.flush - end.to make_allocations(12) - end + it do + skip 'Ruby too old' if RUBY_VERSION < '2.3.0' + expect do + subject.flush + end.to make_allocations(12) end end From 5d8e7f0c75677f3b3bd102a3bc9205c64e764cb1 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 2 Feb 2022 20:27:12 +0000 Subject: [PATCH 19/19] v5.3.3 --- CHANGELOG.md | 18 +++++++++++++++--- lib/datadog/statsd/version.rb | 2 +- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e9d233fd..7f34da5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,16 +2,24 @@ [//]: # (comment: Don't forget to update lib/datadog/statsd/version.rb:DogStatsd::Statsd::VERSION when releasing a new version) - * [ENHANCEMENT] The client can now be configured to use UDS via the `DD_DOGSTATSD_SOCKET` environment variable. +## 5.3.3 / 2022.02.02 + + * [IMPROVEMENT] Add option "buffer_flush_interval" to flush buffered metrics [#231][] by [@abicky][] + + * [IMPROVEMENT] Add Sender.queue_size limits to limit number of buffered metrics [#232][] by [@djmitche][] + + * [IMPROVEMENT] The client can now be configured to use UDS via the `DD_DOGSTATSD_SOCKET` environment variable. This variable does not take precedence over any explicit parameters passed to the Statsd constructor. + [#227][] by [@djmitche][] + ## 5.3.2 / 2021.11.03 - * [OTHER] add a warning message for the v5.x update on install #222 by @djmitche + * [OTHER] add a warning message for the v5.x update on install [#222][] by [@djmitche][] ## 5.3.1 / 2021.10.21 - * [OTHER] restore connection opening behavior from before 5.3.0 (connections not opened on client instantiation but on the first write instead) [#214]][] by [@remeh][] + * [OTHER] restore connection opening behavior from before 5.3.0 (connections not opened on client instantiation but on the first write instead) [#214][] by [@remeh][] ## 5.3.0 / 2021.10.06 @@ -394,6 +402,9 @@ Future versions are likely to introduce backward incompatibilities with < Ruby 1 [#194]: https://github.com/DataDog/dogstatsd-ruby/issues/194 [#205]: https://github.com/DataDog/dogstatsd-ruby/issues/205 [#214]: https://github.com/DataDog/dogstatsd-ruby/issues/214 +[#222]: https://github.com/DataDog/dogstatsd-ruby/issues/222 +[#231]: https://github.com/DataDog/dogstatsd-ruby/issues/231 +[#232]: https://github.com/DataDog/dogstatsd-ruby/issues/232 [@AMekss]: https://github.com/AMekss [@abicky]: https://github.com/abicky [@adimitrov]: https://github.com/adimitrov @@ -402,6 +413,7 @@ Future versions are likely to introduce backward incompatibilities with < Ruby 1 [@claytono]: https://github.com/claytono [@degemer]: https://github.com/degemer [@devleoper]: https://github.com/devleoper +[@djmitche]: https://github.com/djmitche [@djpate]: https://github.com/djpate [@f3ndot]: https://github.com/f3ndot [@fimmtiu]: https://github.com/fimmtiu diff --git a/lib/datadog/statsd/version.rb b/lib/datadog/statsd/version.rb index 31d8c809..8148ed42 100644 --- a/lib/datadog/statsd/version.rb +++ b/lib/datadog/statsd/version.rb @@ -4,6 +4,6 @@ module Datadog class Statsd - VERSION = '5.3.2' + VERSION = '5.3.3' end end