diff --git a/README.md b/README.md index 90afdab3..8ee0c4af 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,13 @@ There are three different kinds of messages: There is also an implicit message which closes the queue which will cause the sender thread to finish processing and exit. + +```ruby +statsd = Datadog::Statsd.new('localhost', 8125) +``` + +The message queue's maximum size (in messages) is given by the `sender_queue_size` argument, and has appropriate defaults for UDP (2048) and UDS (512). + The `buffer_flush_interval`, if enabled, is implemented with an additional thread which manages the timing of those flushes. This additional thread is used even if `single_thread: true`. ### Usual workflow diff --git a/lib/datadog/statsd.rb b/lib/datadog/statsd.rb index 6c8c3861..e0f1f28d 100644 --- a/lib/datadog/statsd.rb +++ b/lib/datadog/statsd.rb @@ -45,7 +45,12 @@ class Error < StandardError UDP_DEFAULT_BUFFER_SIZE = 1_432 UDS_DEFAULT_BUFFER_SIZE = 8_192 DEFAULT_BUFFER_POOL_SIZE = Float::INFINITY + + UDP_DEFAULT_SENDER_QUEUE_SIZE = 2048 + UDS_DEFAULT_SENDER_QUEUE_SIZE = 512 + MAX_EVENT_SIZE = 8 * 1_024 + # minimum flush interval for the telemetry in seconds DEFAULT_TELEMETRY_FLUSH_INTERVAL = 10 @@ -74,6 +79,7 @@ def tags # @option [Logger] logger for debugging # @option [Integer] buffer_max_payload_size max bytes to buffer # @option [Integer] buffer_max_pool_size max messages to buffer + # @option [Integer] sender_queue_size size of the sender queue in number of buffers (multi-thread only) # @option [Numeric] buffer_flush_interval interval in second to flush buffer # @option [String] socket_path unix socket path # @option [Float] default sample rate if not overridden @@ -92,6 +98,8 @@ def initialize( buffer_overflowing_stategy: :drop, buffer_flush_interval: nil, + sender_queue_size: nil, + logger: nil, single_thread: false, @@ -138,6 +146,8 @@ def initialize( buffer_overflowing_stategy: buffer_overflowing_stategy, buffer_flush_interval: buffer_flush_interval, + sender_queue_size: sender_queue_size, + telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil, ) end diff --git a/lib/datadog/statsd/connection.rb b/lib/datadog/statsd/connection.rb index 2a8f999a..35adbbc6 100644 --- a/lib/datadog/statsd/connection.rb +++ b/lib/datadog/statsd/connection.rb @@ -38,7 +38,7 @@ def write(payload) end end - telemetry.dropped(packets: 1, bytes: payload.length) if telemetry + telemetry.dropped_writer(packets: 1, bytes: payload.length) if telemetry logger.error { "Statsd: #{boom.class} #{boom}" } if logger nil end diff --git a/lib/datadog/statsd/forwarder.rb b/lib/datadog/statsd/forwarder.rb index 0d3c2852..dcbd53ed 100644 --- a/lib/datadog/statsd/forwarder.rb +++ b/lib/datadog/statsd/forwarder.rb @@ -14,6 +14,8 @@ def initialize( buffer_overflowing_stategy: :drop, buffer_flush_interval: nil, + sender_queue_size: nil, + telemetry_flush_interval: nil, global_tags: [], @@ -49,7 +51,21 @@ def initialize( max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE, overflowing_stategy: buffer_overflowing_stategy, ) - @sender = (single_thread ? SingleThreadSender : Sender).new(buffer, logger: logger, flush_interval: buffer_flush_interval) + + sender_queue_size ||= (@transport_type == :udp ? + UDP_DEFAULT_SENDER_QUEUE_SIZE : UDS_DEFAULT_SENDER_QUEUE_SIZE) + + @sender = single_thread ? + SingleThreadSender.new( + buffer, + logger: logger, + flush_interval: buffer_flush_interval) : + Sender.new( + buffer, + logger: logger, + flush_interval: buffer_flush_interval, + telemetry: @telemetry, + queue_size: sender_queue_size) @sender.start end diff --git a/lib/datadog/statsd/sender.rb b/lib/datadog/statsd/sender.rb index e96a11d1..ede41980 100644 --- a/lib/datadog/statsd/sender.rb +++ b/lib/datadog/statsd/sender.rb @@ -12,10 +12,14 @@ class Statsd class Sender CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close) - def initialize(message_buffer, logger: nil, flush_interval: nil) + def initialize(message_buffer, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SIZE, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread) @message_buffer = message_buffer + @telemetry = telemetry + @queue_size = queue_size @logger = logger @mx = Mutex.new + @queue_class = queue_class + @thread_class = thread_class if flush_interval @flush_timer = Datadog::Statsd::Timer.new(flush_interval) { flush(sync: true) } end @@ -45,7 +49,7 @@ def rendez_vous return unless message_queue # Initialize and get the thread's sync queue - queue = (Thread.current[:statsd_sync_queue] ||= Queue.new) + queue = (@thread_class.current[:statsd_sync_queue] ||= @queue_class.new) # tell sender-thread to notify us in the current # thread's queue message_queue.push(queue) @@ -75,16 +79,20 @@ def add(message) } end - message_queue << message + if message_queue.length <= @queue_size + message_queue << message + else + @telemetry.dropped_queue(packets: 1, bytes: message.bytesize) if @telemetry + end end def start raise ArgumentError, 'Sender already started' if message_queue # initialize a new message queue for the background thread - @message_queue = Queue.new + @message_queue = @queue_class.new # start background thread - @sender_thread = Thread.new(&method(:send_loop)) + @sender_thread = @thread_class.new(&method(:send_loop)) @sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3') @flush_timer.start if @flush_timer end @@ -131,7 +139,7 @@ def send_loop case message when :flush message_buffer.flush - when Queue + when @queue_class message.push(:go_on) else message_buffer.add(message) @@ -153,7 +161,7 @@ def send_loop break when :flush message_buffer.flush - when Queue + when @queue_class message.push(:go_on) else message_buffer.add(message) diff --git a/lib/datadog/statsd/telemetry.rb b/lib/datadog/statsd/telemetry.rb index 54b45366..89b19357 100644 --- a/lib/datadog/statsd/telemetry.rb +++ b/lib/datadog/statsd/telemetry.rb @@ -9,8 +9,12 @@ class Telemetry attr_reader :service_checks attr_reader :bytes_sent attr_reader :bytes_dropped + attr_reader :bytes_dropped_queue + attr_reader :bytes_dropped_writer attr_reader :packets_sent attr_reader :packets_dropped + attr_reader :packets_dropped_queue + attr_reader :packets_dropped_writer # Rough estimation of maximum telemetry message size without tags MAX_TELEMETRY_MESSAGE_SIZE_WT_TAGS = 50 # bytes @@ -40,8 +44,12 @@ def reset @service_checks = 0 @bytes_sent = 0 @bytes_dropped = 0 + @bytes_dropped_queue = 0 + @bytes_dropped_writer = 0 @packets_sent = 0 @packets_dropped = 0 + @packets_dropped_queue = 0 + @packets_dropped_writer = 0 @next_flush_time = now_in_s + @flush_interval end @@ -54,9 +62,18 @@ def sent(metrics: 0, events: 0, service_checks: 0, bytes: 0, packets: 0) @packets_sent += packets end - def dropped(bytes: 0, packets: 0) + def dropped_queue(bytes: 0, packets: 0) @bytes_dropped += bytes + @bytes_dropped_queue += bytes @packets_dropped += packets + @packets_dropped_queue += packets + end + + def dropped_writer(bytes: 0, packets: 0) + @bytes_dropped += bytes + @bytes_dropped_writer += bytes + @packets_dropped += packets + @packets_dropped_writer += packets end def should_flush? @@ -70,8 +87,12 @@ def flush sprintf(pattern, 'service_checks', @service_checks), sprintf(pattern, 'bytes_sent', @bytes_sent), sprintf(pattern, 'bytes_dropped', @bytes_dropped), + sprintf(pattern, 'bytes_dropped_queue', @bytes_dropped_queue), + sprintf(pattern, 'bytes_dropped_writer', @bytes_dropped_writer), sprintf(pattern, 'packets_sent', @packets_sent), sprintf(pattern, 'packets_dropped', @packets_dropped), + sprintf(pattern, 'packets_dropped_queue', @packets_dropped_queue), + sprintf(pattern, 'packets_dropped_writer', @packets_dropped_writer), ] end diff --git a/spec/integrations/allocation_spec.rb b/spec/integrations/allocation_spec.rb index 02b8477f..8751e6e7 100644 --- a/spec/integrations/allocation_spec.rb +++ b/spec/integrations/allocation_spec.rb @@ -44,11 +44,11 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 16 + 20 elsif RUBY_VERSION < '2.5.0' - 15 + 19 else - 14 + 18 end end @@ -72,13 +72,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 8 + 12 elsif RUBY_VERSION < '2.5.0' - 7 + 11 elsif RUBY_VERSION < '2.6.0' - 6 + 10 else - 5 + 9 end end @@ -93,13 +93,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 25 + 29 elsif RUBY_VERSION < '2.5.0' - 23 + 27 elsif RUBY_VERSION < '2.6.0' - 22 + 26 else - 21 + 25 end end @@ -121,13 +121,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 16 + 20 elsif RUBY_VERSION < '2.5.0' - 15 + 19 elsif RUBY_VERSION < '2.6.0' - 14 + 18 else - 13 + 17 end end @@ -151,13 +151,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 8 + 12 elsif RUBY_VERSION < '2.5.0' - 7 + 11 elsif RUBY_VERSION < '2.6.0' - 6 + 10 else - 5 + 9 end end @@ -172,13 +172,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 25 + 29 elsif RUBY_VERSION < '2.5.0' - 23 + 27 elsif RUBY_VERSION < '2.6.0' - 22 + 26 else - 21 + 25 end end @@ -200,13 +200,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 18 + 22 elsif RUBY_VERSION < '2.5.0' - 17 + 21 elsif RUBY_VERSION < '2.6.0' - 16 + 20 else - 15 + 19 end end @@ -230,13 +230,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 10 + 14 elsif RUBY_VERSION < '2.5.0' - 9 + 13 elsif RUBY_VERSION < '2.6.0' - 8 + 12 else - 7 + 11 end end @@ -251,13 +251,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 27 + 31 elsif RUBY_VERSION < '2.5.0' - 25 + 29 elsif RUBY_VERSION < '2.6.0' - 24 + 28 else - 23 + 27 end end @@ -279,13 +279,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 14 + 18 elsif RUBY_VERSION < '2.5.0' - 13 + 17 elsif RUBY_VERSION < '2.6.0' - 12 + 16 else - 11 + 15 end end @@ -309,13 +309,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 6 + 10 elsif RUBY_VERSION < '2.5.0' - 5 + 9 elsif RUBY_VERSION < '2.6.0' - 4 + 8 else - 3 + 7 end end @@ -330,13 +330,13 @@ context 'with tags' do let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 23 + 27 elsif RUBY_VERSION < '2.5.0' - 21 + 25 elsif RUBY_VERSION < '2.6.0' - 20 + 24 else - 19 + 23 end end @@ -348,4 +348,4 @@ end end end -end \ No newline at end of file +end diff --git a/spec/integrations/buffering_spec.rb b/spec/integrations/buffering_spec.rb index 4ca72bb4..2b767010 100644 --- a/spec/integrations/buffering_spec.rb +++ b/spec/integrations/buffering_spec.rb @@ -141,7 +141,7 @@ end let(:buffer_max_pool_size) do - 9 + 13 # enough messages to include the telemetry end it 'increments telemetry correctly' do @@ -156,7 +156,7 @@ subject.flush(flush_telemetry: true, sync: true) - expect(socket.recv[0]).to eq_with_telemetry('mycounter:1|c', bytes_sent: 702, packets_sent: 1, metrics: 1) + expect(socket.recv[0]).to eq_with_telemetry('mycounter:1|c', bytes_sent: 1124, packets_sent: 1, metrics: 1) subject.increment('myothercounter') @@ -166,7 +166,7 @@ subject.sync_with_outbound_io - expect(socket.recv[0]).to eq_with_telemetry('myothercounter:1|c', bytes_sent: 687, packets_sent: 1, metrics: 1) + expect(socket.recv[0]).to eq_with_telemetry('myothercounter:1|c', bytes_sent: 1110, packets_sent: 1, metrics: 1) # last value is still buffered expect(socket.recv).to be_nil end @@ -199,7 +199,7 @@ expect(subject.telemetry.service_checks).to eq 0 expect(subject.telemetry.events).to eq 0 expect(subject.telemetry.packets_sent).to eq 1 - expect(subject.telemetry.bytes_sent).to eq 766 + expect(subject.telemetry.bytes_sent).to eq 1188 end end end diff --git a/spec/integrations/telemetry_spec.rb b/spec/integrations/telemetry_spec.rb index 36899143..0ebfa95d 100644 --- a/spec/integrations/telemetry_spec.rb +++ b/spec/integrations/telemetry_spec.rb @@ -92,35 +92,35 @@ subject.decrement('test', 1) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:-1|c', metrics: 1, packets_sent: 1, bytes_sent: 680) + expect(socket.recv[0]).to eq_with_telemetry('test:-1|c', metrics: 1, packets_sent: 1, bytes_sent: 1102) subject.count('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|c', metrics: 1, packets_sent: 1, bytes_sent: 683) + expect(socket.recv[0]).to eq_with_telemetry('test:21|c', metrics: 1, packets_sent: 1, bytes_sent: 1106) subject.gauge('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|g', metrics: 1, packets_sent: 1, bytes_sent: 683) + expect(socket.recv[0]).to eq_with_telemetry('test:21|g', metrics: 1, packets_sent: 1, bytes_sent: 1106) subject.histogram('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|h', metrics: 1, packets_sent: 1, bytes_sent: 683) + expect(socket.recv[0]).to eq_with_telemetry('test:21|h', metrics: 1, packets_sent: 1, bytes_sent: 1106) subject.timing('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|ms', metrics: 1, packets_sent: 1, bytes_sent: 683) + expect(socket.recv[0]).to eq_with_telemetry('test:21|ms', metrics: 1, packets_sent: 1, bytes_sent: 1106) subject.set('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|s', metrics: 1, packets_sent: 1, bytes_sent: 684) + expect(socket.recv[0]).to eq_with_telemetry('test:21|s', metrics: 1, packets_sent: 1, bytes_sent: 1107) subject.service_check('sc', 0) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('_sc|sc|0', metrics: 0, service_checks: 1, packets_sent: 1, bytes_sent: 683) + expect(socket.recv[0]).to eq_with_telemetry('_sc|sc|0', metrics: 0, service_checks: 1, packets_sent: 1, bytes_sent: 1106) subject.event('ev', 'text') subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('_e{2,4}:ev|text', metrics: 0, events: 1, packets_sent: 1, bytes_sent: 682) + expect(socket.recv[0]).to eq_with_telemetry('_e{2,4}:ev|text', metrics: 0, events: 1, packets_sent: 1, bytes_sent: 1105) end context 'when some data is dropped' do @@ -139,8 +139,10 @@ expect(subject.telemetry.events).to eq 0 expect(subject.telemetry.packets_sent).to eq 0 expect(subject.telemetry.bytes_sent).to eq 0 - expect(subject.telemetry.packets_dropped).to eq 1 - expect(subject.telemetry.bytes_dropped).to eq 1353 + expect(subject.telemetry.packets_dropped).to eq 2 + expect(subject.telemetry.packets_dropped_writer).to eq 2 + expect(subject.telemetry.bytes_dropped).to eq 2196 + expect(subject.telemetry.bytes_dropped_writer).to eq 2196 subject.gauge('test', 21) subject.flush(flush_telemetry: true, sync: true) @@ -150,23 +152,34 @@ expect(subject.telemetry.events).to eq 0 expect(subject.telemetry.packets_sent).to eq 0 expect(subject.telemetry.bytes_sent).to eq 0 - expect(subject.telemetry.packets_dropped).to eq 1 - expect(subject.telemetry.bytes_dropped).to eq 1356 + expect(subject.telemetry.packets_dropped).to eq 2 + expect(subject.telemetry.packets_dropped_writer).to eq 2 + expect(subject.telemetry.bytes_dropped).to eq 2202 + expect(subject.telemetry.bytes_dropped_writer).to eq 2202 #disable network failure socket.error_on_send(nil) subject.gauge('test', 21) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry('test:21|g', metrics: 1, service_checks: 0, events: 0, packets_dropped: 1, bytes_dropped: 1356) + expect(socket.recv[0]).to eq_with_telemetry('test:21|g', + metrics: 1, + service_checks: 0, + events: 0, + packets_dropped: 2, + packets_dropped_writer: 2, + bytes_dropped: 2202, + bytes_dropped_writer: 2202) expect(subject.telemetry.metrics).to eq 0 expect(subject.telemetry.service_checks).to eq 0 expect(subject.telemetry.events).to eq 0 expect(subject.telemetry.packets_sent).to eq 1 - expect(subject.telemetry.bytes_sent).to eq 684 + expect(subject.telemetry.bytes_sent).to eq 1109 expect(subject.telemetry.packets_dropped).to eq 0 + expect(subject.telemetry.packets_dropped_writer).to eq 0 expect(subject.telemetry.bytes_dropped).to eq 0 + expect(subject.telemetry.bytes_dropped_writer).to eq 0 end end end diff --git a/spec/matchers/telemetry_matcher.rb b/spec/matchers/telemetry_matcher.rb index 3c34e024..55a59509 100644 --- a/spec/matchers/telemetry_matcher.rb +++ b/spec/matchers/telemetry_matcher.rb @@ -4,7 +4,19 @@ telemetry_options ||= {} # Appends the telemetry metrics to the metrics string passed as 'text' - def add_telemetry(text, metrics: 1, events: 0, service_checks: 0, bytes_sent: 0, bytes_dropped:0, packets_sent: 0, packets_dropped: 0, transport: 'udp') + def add_telemetry(text, + metrics: 1, + events: 0, + service_checks: 0, + bytes_sent: 0, + bytes_dropped: 0, + bytes_dropped_queue: 0, + bytes_dropped_writer: 0, + packets_sent: 0, + packets_dropped: 0, + packets_dropped_queue: 0, + packets_dropped_writer: 0, + transport: 'udp') [ text, "datadog.dogstatsd.client.metrics:#{metrics}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", @@ -12,8 +24,12 @@ def add_telemetry(text, metrics: 1, events: 0, service_checks: 0, bytes_sent: 0, "datadog.dogstatsd.client.service_checks:#{service_checks}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.bytes_sent:#{bytes_sent}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.bytes_dropped:#{bytes_dropped}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", + "datadog.dogstatsd.client.bytes_dropped_queue:#{bytes_dropped_queue}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", + "datadog.dogstatsd.client.bytes_dropped_writer:#{bytes_dropped_writer}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.packets_sent:#{packets_sent}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", "datadog.dogstatsd.client.packets_dropped:#{packets_dropped}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", + "datadog.dogstatsd.client.packets_dropped_queue:#{packets_dropped_queue}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", + "datadog.dogstatsd.client.packets_dropped_writer:#{packets_dropped_writer}|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:#{transport}", ].join("\n") end diff --git a/spec/statsd/forwarder_spec.rb b/spec/statsd/forwarder_spec.rb index aafcdf51..31120824 100644 --- a/spec/statsd/forwarder_spec.rb +++ b/spec/statsd/forwarder_spec.rb @@ -109,7 +109,11 @@ it 'builds the sender' do expect(Datadog::Statsd::Sender) .to receive(:new) - .with(message_buffer, logger: logger, flush_interval: buffer_flush_interval) + .with(message_buffer, + logger: logger, + flush_interval: buffer_flush_interval, + queue_size: Datadog::Statsd::UDP_DEFAULT_SENDER_QUEUE_SIZE, + telemetry: telemetry) .exactly(1) subject @@ -288,7 +292,11 @@ it 'builds the sender' do expect(Datadog::Statsd::Sender) .to receive(:new) - .with(message_buffer, logger: logger, flush_interval: buffer_flush_interval) + .with(message_buffer, + logger: logger, + flush_interval: buffer_flush_interval, + queue_size: Datadog::Statsd::UDS_DEFAULT_SENDER_QUEUE_SIZE, + telemetry: telemetry) .exactly(1) subject diff --git a/spec/statsd/sender_spec.rb b/spec/statsd/sender_spec.rb index 6d35d60c..0ee85bb9 100644 --- a/spec/statsd/sender_spec.rb +++ b/spec/statsd/sender_spec.rb @@ -2,14 +2,28 @@ describe Datadog::Statsd::Sender do subject do - described_class.new(message_buffer, flush_interval: flush_interval) + described_class.new( + message_buffer, + flush_interval: flush_interval, + telemetry: telemetry, + queue_size: queue_size, + queue_class: queue_class, + thread_class: thread_class) end + let(:queue_size) { 5 } + let(:queue_class) { Queue } + let(:thread_class) { Thread } + let(:message_buffer) do instance_double(Datadog::Statsd::MessageBuffer) end let(:flush_interval) { nil } + let(:telemetry) do + instance_double(Datadog::Statsd::Telemetry) + end + describe '#start' do after do subject.stop @@ -113,7 +127,7 @@ end end - context 'when starting and stopping' do + context 'when started' do before do subject.start end @@ -131,6 +145,56 @@ subject.rendez_vous end + + context 'with fake queue and fake sender thread' do + let(:fake_queue) do + if Queue.instance_methods.include?(:close) + instance_double(Queue, { "length" => fake_queue_length, "<<" => true, "close" => true }) + else + instance_double(Queue, { "length" => fake_queue_length, "<<" => true }) + end + end + + let(:queue_class) do + class_double(Queue, new: fake_queue) + end + + let(:thread_class) do + if Thread.instance_methods.include?(:name=) + fake_thread = instance_double(Thread, { "alive?" => true, "name=" => true, "join" => true }) + else + fake_thread = instance_double(Thread, { "alive?" => true, "join" => true }) + end + class_double(Thread, new: fake_thread) + end + + context 'with fewer messages in queue than queue_size' do + let(:fake_queue_length) { queue_size } + + it 'adds only messages up to queue_size messages' do + expect(fake_queue).to receive(:<<).with('message') + if not Queue.instance_methods.include?(:close) + expect(fake_queue).to receive(:<<).with(:close) + end + expect(telemetry).not_to receive(:dropped_queue) + subject.add('message') + end + end + + context 'with more messages in queue than queue_size' do + let(:fake_queue_length) { queue_size + 1 } + + it 'adds only messages up to queue_size messages' do + if Queue.instance_methods.include?(:close) + expect(fake_queue).not_to receive(:<<) + else + expect(fake_queue).to receive(:<<).with(:close) + end + expect(telemetry).to receive(:dropped_queue).with(bytes: 7, packets: 1) + subject.add('message') + end + end + end end end diff --git a/spec/statsd/telemetry_spec.rb b/spec/statsd/telemetry_spec.rb index faab9deb..55f21e58 100644 --- a/spec/statsd/telemetry_spec.rb +++ b/spec/statsd/telemetry_spec.rb @@ -77,7 +77,8 @@ describe '#flush' do before do subject.sent(metrics: 1, events: 2, service_checks: 3, bytes: 4, packets: 5) - subject.dropped(bytes: 6, packets: 7) + subject.dropped_writer(bytes: 6, packets: 7) + subject.dropped_queue(bytes: 9, packets: 8) subject.flush end @@ -87,22 +88,21 @@ "datadog.dogstatsd.client.events:2|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.service_checks:3|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.bytes_sent:4|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", - "datadog.dogstatsd.client.bytes_dropped:6|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.bytes_dropped:15|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.bytes_dropped_queue:9|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.bytes_dropped_writer:6|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", "datadog.dogstatsd.client.packets_sent:5|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", - "datadog.dogstatsd.client.packets_dropped:7|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.packets_dropped:15|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.packets_dropped_queue:8|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", + "datadog.dogstatsd.client.packets_dropped_writer:7|c|#client:ruby,client_version:#{Datadog::Statsd::VERSION},client_transport:doe", ] end - context do - before do - skip 'Ruby too old' if RUBY_VERSION < '2.3.0' - end - - it 'makes only 8 allocations' do - expect do - subject.flush - end.to make_allocations(8) - end + it do + skip 'Ruby too old' if RUBY_VERSION < '2.3.0' + expect do + subject.flush + end.to make_allocations(12) end end @@ -112,7 +112,8 @@ allow(Process).to receive(:clock_gettime).and_return(0) if Datadog::Statsd::PROCESS_TIME_SUPPORTED subject.sent(metrics: 1, events: 2, service_checks: 3, bytes: 4, packets: 5) - subject.dropped(bytes: 6, packets: 7) + subject.dropped_writer(bytes: 6, packets: 7) + subject.dropped_queue(bytes: 9, packets: 7) end after do @@ -161,13 +162,37 @@ it 'resets the bytes_dropped' do expect do subject.reset - end.to change { subject.bytes_dropped }.from(6).to(0) + end.to change { subject.bytes_dropped}.from(15).to(0) + end + + it 'resets the bytes_dropped_queue' do + expect do + subject.reset + end.to change { subject.bytes_dropped_queue }.from(9).to(0) + end + + it 'resets the bytes_dropped_writer' do + expect do + subject.reset + end.to change { subject.bytes_dropped_writer }.from(6).to(0) end it 'resets the packets_dropped' do expect do subject.reset - end.to change { subject.packets_dropped }.from(7).to(0) + end.to change { subject.packets_dropped}.from(14).to(0) + end + + it 'resets the packets_dropped_queue' do + expect do + subject.reset + end.to change { subject.packets_dropped_queue }.from(7).to(0) + end + + it 'resets the packets_dropped_writer' do + expect do + subject.reset + end.to change { subject.packets_dropped_writer }.from(7).to(0) end end @@ -213,19 +238,61 @@ end end - describe '#dropped' do + describe '#dropped_queue' do context 'when bumping bytes' do + it 'has bumped bytes_dropped_queue by the right amount' do + expect do + subject.dropped_queue(bytes: 3) + end.to change { subject.bytes_dropped_queue }.from(0).to(3) + end + it 'has bumped bytes_dropped by the right amount' do expect do - subject.dropped(bytes: 3) + subject.dropped_queue(bytes: 3) end.to change { subject.bytes_dropped }.from(0).to(3) end end context 'when bumping packets' do + it 'has bumped packets_dropped_queue by the right amount' do + expect do + subject.dropped_queue(packets: 3) + end.to change { subject.packets_dropped_queue }.from(0).to(3) + end + + it 'has bumped packets_dropped by the right amount' do + expect do + subject.dropped_queue(packets: 3) + end.to change { subject.packets_dropped }.from(0).to(3) + end + end + end + + describe '#dropped_writer' do + context 'when bumping bytes' do + it 'has bumped bytes_dropped_writer by the right amount' do + expect do + subject.dropped_writer(bytes: 3) + end.to change { subject.bytes_dropped_writer }.from(0).to(3) + end + + it 'has bumped bytes_dropped by the right amount' do + expect do + subject.dropped_writer(bytes: 3) + end.to change { subject.bytes_dropped }.from(0).to(3) + end + end + + context 'when bumping packets' do + it 'has bumped packets_dropped_writer by the right amount' do + expect do + subject.dropped_writer(packets: 3) + end.to change { subject.packets_dropped_writer }.from(0).to(3) + end + it 'has bumped packets_dropped by the right amount' do expect do - subject.dropped(packets: 3) + subject.dropped_writer(packets: 3) end.to change { subject.packets_dropped }.from(0).to(3) end end diff --git a/spec/statsd/udp_connection_spec.rb b/spec/statsd/udp_connection_spec.rb index 66baeac4..3c9d6e2a 100644 --- a/spec/statsd/udp_connection_spec.rb +++ b/spec/statsd/udp_connection_spec.rb @@ -51,7 +51,7 @@ describe '#write' do let(:telemetry) do - instance_double(Datadog::Statsd::Telemetry, sent: true, dropped: true) + instance_double(Datadog::Statsd::Telemetry, sent: true, dropped_writer: true) end it 'connects to the right host and port' do @@ -108,9 +108,9 @@ subject.write('test') end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -262,9 +262,9 @@ subject.write('test') end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -289,9 +289,9 @@ expect(log.string).to match 'Statsd: SocketError yolo' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -336,9 +336,9 @@ subject.write('test') end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -463,9 +463,9 @@ subject.write('test') end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -528,9 +528,9 @@ subject.write('test') end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') diff --git a/spec/statsd/uds_connection_spec.rb b/spec/statsd/uds_connection_spec.rb index fa7695b4..b8814cfe 100644 --- a/spec/statsd/uds_connection_spec.rb +++ b/spec/statsd/uds_connection_spec.rb @@ -42,7 +42,7 @@ describe '#write' do let(:telemetry) do - instance_double(Datadog::Statsd::Telemetry, sent: true, dropped: true) + instance_double(Datadog::Statsd::Telemetry, sent: true, dropped_writer: true) end it 'builds the socket in the right mode' do @@ -142,9 +142,9 @@ expect(log.string).to match 'Statsd: Datadog::Statsd::UDSConnection::BadSocketError Errno::ECONNRESET: Connection reset by peer' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -171,9 +171,9 @@ expect(log.string).to match 'Statsd: RuntimeError yolo' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -199,9 +199,9 @@ expect(log.string).to match 'Statsd: SocketError yolo' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -248,9 +248,9 @@ expect(log.string).to match 'Statsd: Datadog::Statsd::UDSConnection::BadSocketError Errno::ECONNREFUSED: Connection refused - closed stream' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -277,9 +277,9 @@ expect(log.string).to match 'Statsd: RuntimeError yolo' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -305,9 +305,9 @@ expect(log.string).to match 'Errno::ECONNREFUSED Connection refused - yolo' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -351,9 +351,9 @@ expect(log.string).to match 'Statsd: Errno::ENOENT No such file or directory' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') @@ -398,9 +398,9 @@ expect(log.string).to match 'Statsd: IO::EAGAINWaitWritable Resource temporarily unavailable' end - it 'updates the "dropped" telemetry counts' do + it 'updates the "dropped_writer" telemetry counts' do expect(telemetry) - .to receive(:dropped) + .to receive(:dropped_writer) .with(bytes: 4, packets: 1) subject.write('test') diff --git a/spec/statsd_spec.rb b/spec/statsd_spec.rb index bdbf8fb2..64d87a8c 100644 --- a/spec/statsd_spec.rb +++ b/spec/statsd_spec.rb @@ -397,7 +397,7 @@ subject.gauge('begrutten-suffusion', -107.3) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry 'begrutten-suffusion:-107.3|g', bytes_sent: 697, packets_sent: 1 + expect(socket.recv[0]).to eq_with_telemetry 'begrutten-suffusion:-107.3|g', bytes_sent: 1119, packets_sent: 1 end context 'with a sample rate' do @@ -455,7 +455,7 @@ subject.histogram('ohmy', -107.3) subject.flush(sync: true) - expect(socket.recv[0]).to eq_with_telemetry 'ohmy:-107.3|h', bytes_sent: 682, packets_sent: 1 + expect(socket.recv[0]).to eq_with_telemetry 'ohmy:-107.3|h', bytes_sent: 1104, packets_sent: 1 end context 'with a sample rate' do