Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sender.queue_size limits #230

Merged
merged 10 commits into from
Feb 1, 2022
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ There are three different kinds of messages:

There is also an implicit message which closes the queue which will cause the sender thread to finish processing and exit.


```ruby
statsd = Datadog::Statsd.new('localhost', 8125)
```

The message queue's maximum size (in messages) is given by the `sender_queue_size` argument, and has appropriate defaults for UDP (2048) and UDS (512).

The `buffer_flush_interval`, if enabled, is implemented with an additional thread which manages the timing of those flushes. This additional thread is used even if `single_thread: true`.

### Usual workflow
Expand Down
10 changes: 10 additions & 0 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ class Error < StandardError
UDP_DEFAULT_BUFFER_SIZE = 1_432
UDS_DEFAULT_BUFFER_SIZE = 8_192
DEFAULT_BUFFER_POOL_SIZE = Float::INFINITY

UDP_DEFAULT_SENDER_QUEUE_SIZE = 2048
UDS_DEFAULT_SENDER_QUEUE_SIZE = 512

MAX_EVENT_SIZE = 8 * 1_024

# minimum flush interval for the telemetry in seconds
DEFAULT_TELEMETRY_FLUSH_INTERVAL = 10

Expand Down Expand Up @@ -74,6 +79,7 @@ def tags
# @option [Logger] logger for debugging
# @option [Integer] buffer_max_payload_size max bytes to buffer
# @option [Integer] buffer_max_pool_size max messages to buffer
# @option [Integer] sender_queue_size size of the sender queue in number of buffers (multi-thread only)
# @option [Numeric] buffer_flush_interval interval in second to flush buffer
# @option [String] socket_path unix socket path
# @option [Float] default sample rate if not overridden
Expand All @@ -92,6 +98,8 @@ def initialize(
buffer_overflowing_stategy: :drop,
buffer_flush_interval: nil,

sender_queue_size: nil,

logger: nil,

single_thread: false,
Expand Down Expand Up @@ -138,6 +146,8 @@ def initialize(
buffer_overflowing_stategy: buffer_overflowing_stategy,
buffer_flush_interval: buffer_flush_interval,

sender_queue_size: sender_queue_size,

telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil,
)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/statsd/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def write(payload)
end
end

telemetry.dropped(packets: 1, bytes: payload.length) if telemetry
telemetry.dropped_writer(packets: 1, bytes: payload.length) if telemetry
logger.error { "Statsd: #{boom.class} #{boom}" } if logger
nil
end
Expand Down
18 changes: 17 additions & 1 deletion lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def initialize(
buffer_overflowing_stategy: :drop,
buffer_flush_interval: nil,

sender_queue_size: nil,

telemetry_flush_interval: nil,
global_tags: [],

Expand Down Expand Up @@ -49,7 +51,21 @@ def initialize(
max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE,
overflowing_stategy: buffer_overflowing_stategy,
)
@sender = (single_thread ? SingleThreadSender : Sender).new(buffer, logger: logger, flush_interval: buffer_flush_interval)

sender_queue_size ||= (@transport_type == :udp ?
UDP_DEFAULT_SENDER_QUEUE_SIZE : UDS_DEFAULT_SENDER_QUEUE_SIZE)

@sender = single_thread ?
SingleThreadSender.new(
buffer,
logger: logger,
flush_interval: buffer_flush_interval) :
Sender.new(
buffer,
logger: logger,
flush_interval: buffer_flush_interval,
telemetry: @telemetry,
queue_size: sender_queue_size)
@sender.start
end

Expand Down
22 changes: 15 additions & 7 deletions lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ class Statsd
class Sender
CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close)

def initialize(message_buffer, logger: nil, flush_interval: nil)
def initialize(message_buffer, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SIZE, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread)
@message_buffer = message_buffer
@telemetry = telemetry
@queue_size = queue_size
@logger = logger
@mx = Mutex.new
@queue_class = queue_class
@thread_class = thread_class
if flush_interval
@flush_timer = Datadog::Statsd::Timer.new(flush_interval) { flush(sync: true) }
end
Expand Down Expand Up @@ -45,7 +49,7 @@ def rendez_vous
return unless message_queue

# Initialize and get the thread's sync queue
queue = (Thread.current[:statsd_sync_queue] ||= Queue.new)
queue = (@thread_class.current[:statsd_sync_queue] ||= @queue_class.new)
# tell sender-thread to notify us in the current
# thread's queue
message_queue.push(queue)
Expand Down Expand Up @@ -75,16 +79,20 @@ def add(message)
}
end

message_queue << message
if message_queue.length <= @queue_size
message_queue << message
else
@telemetry.dropped_queue(packets: 1, bytes: message.bytesize) if @telemetry
end
end

def start
raise ArgumentError, 'Sender already started' if message_queue

# initialize a new message queue for the background thread
@message_queue = Queue.new
@message_queue = @queue_class.new
# start background thread
@sender_thread = Thread.new(&method(:send_loop))
@sender_thread = @thread_class.new(&method(:send_loop))
@sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')
@flush_timer.start if @flush_timer
end
Expand Down Expand Up @@ -131,7 +139,7 @@ def send_loop
case message
when :flush
message_buffer.flush
when Queue
when @queue_class
message.push(:go_on)
else
message_buffer.add(message)
Expand All @@ -153,7 +161,7 @@ def send_loop
break
when :flush
message_buffer.flush
when Queue
when @queue_class
message.push(:go_on)
else
message_buffer.add(message)
Expand Down
23 changes: 22 additions & 1 deletion lib/datadog/statsd/telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ class Telemetry
attr_reader :service_checks
attr_reader :bytes_sent
attr_reader :bytes_dropped
attr_reader :bytes_dropped_queue
attr_reader :bytes_dropped_writer
attr_reader :packets_sent
attr_reader :packets_dropped
attr_reader :packets_dropped_queue
attr_reader :packets_dropped_writer

# Rough estimation of maximum telemetry message size without tags
MAX_TELEMETRY_MESSAGE_SIZE_WT_TAGS = 50 # bytes
Expand Down Expand Up @@ -40,8 +44,12 @@ def reset
@service_checks = 0
@bytes_sent = 0
@bytes_dropped = 0
@bytes_dropped_queue = 0
@bytes_dropped_writer = 0
@packets_sent = 0
@packets_dropped = 0
@packets_dropped_queue = 0
@packets_dropped_writer = 0
@next_flush_time = now_in_s + @flush_interval
end

Expand All @@ -54,9 +62,18 @@ def sent(metrics: 0, events: 0, service_checks: 0, bytes: 0, packets: 0)
@packets_sent += packets
end

def dropped(bytes: 0, packets: 0)
def dropped_queue(bytes: 0, packets: 0)
@bytes_dropped += bytes
@bytes_dropped_queue += bytes
@packets_dropped += packets
@packets_dropped_queue += packets
end

def dropped_writer(bytes: 0, packets: 0)
@bytes_dropped += bytes
@bytes_dropped_writer += bytes
@packets_dropped += packets
@packets_dropped_writer += packets
end

def should_flush?
Expand All @@ -70,8 +87,12 @@ def flush
sprintf(pattern, 'service_checks', @service_checks),
sprintf(pattern, 'bytes_sent', @bytes_sent),
sprintf(pattern, 'bytes_dropped', @bytes_dropped),
sprintf(pattern, 'bytes_dropped_queue', @bytes_dropped_queue),
sprintf(pattern, 'bytes_dropped_writer', @bytes_dropped_writer),
sprintf(pattern, 'packets_sent', @packets_sent),
sprintf(pattern, 'packets_dropped', @packets_dropped),
sprintf(pattern, 'packets_dropped_queue', @packets_dropped_queue),
sprintf(pattern, 'packets_dropped_writer', @packets_dropped_writer),
]
end

Expand Down
Loading