Skip to content

Commit

Permalink
Merge pull request #230 from DataDog/dustin.mitchell/ac-1321
Browse files Browse the repository at this point in the history
Add Sender.queue_size limits
  • Loading branch information
djmitche authored Feb 1, 2022
2 parents 23f73df + a528562 commit 83c0fbc
Show file tree
Hide file tree
Showing 16 changed files with 362 additions and 132 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ There are three different kinds of messages:
There is also an implicit message which closes the queue which will cause the sender thread to finish processing and exit.
```ruby
statsd = Datadog::Statsd.new('localhost', 8125)
```

The message queue's maximum size (in messages) is given by the `sender_queue_size` argument, and has appropriate defaults for UDP (2048) and UDS (512).

The `buffer_flush_interval`, if enabled, is implemented with an additional thread which manages the timing of those flushes. This additional thread is used even if `single_thread: true`.

### Usual workflow
Expand Down
10 changes: 10 additions & 0 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ class Error < StandardError
UDP_DEFAULT_BUFFER_SIZE = 1_432
UDS_DEFAULT_BUFFER_SIZE = 8_192
DEFAULT_BUFFER_POOL_SIZE = Float::INFINITY

UDP_DEFAULT_SENDER_QUEUE_SIZE = 2048
UDS_DEFAULT_SENDER_QUEUE_SIZE = 512

MAX_EVENT_SIZE = 8 * 1_024

# minimum flush interval for the telemetry in seconds
DEFAULT_TELEMETRY_FLUSH_INTERVAL = 10

Expand Down Expand Up @@ -74,6 +79,7 @@ def tags
# @option [Logger] logger for debugging
# @option [Integer] buffer_max_payload_size max bytes to buffer
# @option [Integer] buffer_max_pool_size max messages to buffer
# @option [Integer] sender_queue_size size of the sender queue in number of buffers (multi-thread only)
# @option [Numeric] buffer_flush_interval interval in second to flush buffer
# @option [String] socket_path unix socket path
# @option [Float] default sample rate if not overridden
Expand All @@ -92,6 +98,8 @@ def initialize(
buffer_overflowing_stategy: :drop,
buffer_flush_interval: nil,

sender_queue_size: nil,

logger: nil,

single_thread: false,
Expand Down Expand Up @@ -138,6 +146,8 @@ def initialize(
buffer_overflowing_stategy: buffer_overflowing_stategy,
buffer_flush_interval: buffer_flush_interval,

sender_queue_size: sender_queue_size,

telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil,
)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/statsd/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def write(payload)
end
end

telemetry.dropped(packets: 1, bytes: payload.length) if telemetry
telemetry.dropped_writer(packets: 1, bytes: payload.length) if telemetry
logger.error { "Statsd: #{boom.class} #{boom}" } if logger
nil
end
Expand Down
18 changes: 17 additions & 1 deletion lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def initialize(
buffer_overflowing_stategy: :drop,
buffer_flush_interval: nil,

sender_queue_size: nil,

telemetry_flush_interval: nil,
global_tags: [],

Expand Down Expand Up @@ -49,7 +51,21 @@ def initialize(
max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE,
overflowing_stategy: buffer_overflowing_stategy,
)
@sender = (single_thread ? SingleThreadSender : Sender).new(buffer, logger: logger, flush_interval: buffer_flush_interval)

sender_queue_size ||= (@transport_type == :udp ?
UDP_DEFAULT_SENDER_QUEUE_SIZE : UDS_DEFAULT_SENDER_QUEUE_SIZE)

@sender = single_thread ?
SingleThreadSender.new(
buffer,
logger: logger,
flush_interval: buffer_flush_interval) :
Sender.new(
buffer,
logger: logger,
flush_interval: buffer_flush_interval,
telemetry: @telemetry,
queue_size: sender_queue_size)
@sender.start
end

Expand Down
22 changes: 15 additions & 7 deletions lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ class Statsd
class Sender
CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close)

def initialize(message_buffer, logger: nil, flush_interval: nil)
def initialize(message_buffer, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SIZE, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread)
@message_buffer = message_buffer
@telemetry = telemetry
@queue_size = queue_size
@logger = logger
@mx = Mutex.new
@queue_class = queue_class
@thread_class = thread_class
if flush_interval
@flush_timer = Datadog::Statsd::Timer.new(flush_interval) { flush(sync: true) }
end
Expand Down Expand Up @@ -45,7 +49,7 @@ def rendez_vous
return unless message_queue

# Initialize and get the thread's sync queue
queue = (Thread.current[:statsd_sync_queue] ||= Queue.new)
queue = (@thread_class.current[:statsd_sync_queue] ||= @queue_class.new)
# tell sender-thread to notify us in the current
# thread's queue
message_queue.push(queue)
Expand Down Expand Up @@ -75,16 +79,20 @@ def add(message)
}
end

message_queue << message
if message_queue.length <= @queue_size
message_queue << message
else
@telemetry.dropped_queue(packets: 1, bytes: message.bytesize) if @telemetry
end
end

def start
raise ArgumentError, 'Sender already started' if message_queue

# initialize a new message queue for the background thread
@message_queue = Queue.new
@message_queue = @queue_class.new
# start background thread
@sender_thread = Thread.new(&method(:send_loop))
@sender_thread = @thread_class.new(&method(:send_loop))
@sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')
@flush_timer.start if @flush_timer
end
Expand Down Expand Up @@ -131,7 +139,7 @@ def send_loop
case message
when :flush
message_buffer.flush
when Queue
when @queue_class
message.push(:go_on)
else
message_buffer.add(message)
Expand All @@ -153,7 +161,7 @@ def send_loop
break
when :flush
message_buffer.flush
when Queue
when @queue_class
message.push(:go_on)
else
message_buffer.add(message)
Expand Down
23 changes: 22 additions & 1 deletion lib/datadog/statsd/telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ class Telemetry
attr_reader :service_checks
attr_reader :bytes_sent
attr_reader :bytes_dropped
attr_reader :bytes_dropped_queue
attr_reader :bytes_dropped_writer
attr_reader :packets_sent
attr_reader :packets_dropped
attr_reader :packets_dropped_queue
attr_reader :packets_dropped_writer

# Rough estimation of maximum telemetry message size without tags
MAX_TELEMETRY_MESSAGE_SIZE_WT_TAGS = 50 # bytes
Expand Down Expand Up @@ -40,8 +44,12 @@ def reset
@service_checks = 0
@bytes_sent = 0
@bytes_dropped = 0
@bytes_dropped_queue = 0
@bytes_dropped_writer = 0
@packets_sent = 0
@packets_dropped = 0
@packets_dropped_queue = 0
@packets_dropped_writer = 0
@next_flush_time = now_in_s + @flush_interval
end

Expand All @@ -54,9 +62,18 @@ def sent(metrics: 0, events: 0, service_checks: 0, bytes: 0, packets: 0)
@packets_sent += packets
end

def dropped(bytes: 0, packets: 0)
def dropped_queue(bytes: 0, packets: 0)
@bytes_dropped += bytes
@bytes_dropped_queue += bytes
@packets_dropped += packets
@packets_dropped_queue += packets
end

def dropped_writer(bytes: 0, packets: 0)
@bytes_dropped += bytes
@bytes_dropped_writer += bytes
@packets_dropped += packets
@packets_dropped_writer += packets
end

def should_flush?
Expand All @@ -70,8 +87,12 @@ def flush
sprintf(pattern, 'service_checks', @service_checks),
sprintf(pattern, 'bytes_sent', @bytes_sent),
sprintf(pattern, 'bytes_dropped', @bytes_dropped),
sprintf(pattern, 'bytes_dropped_queue', @bytes_dropped_queue),
sprintf(pattern, 'bytes_dropped_writer', @bytes_dropped_writer),
sprintf(pattern, 'packets_sent', @packets_sent),
sprintf(pattern, 'packets_dropped', @packets_dropped),
sprintf(pattern, 'packets_dropped_queue', @packets_dropped_queue),
sprintf(pattern, 'packets_dropped_writer', @packets_dropped_writer),
]
end

Expand Down
Loading

0 comments on commit 83c0fbc

Please sign in to comment.