Skip to content

Commit

Permalink
Merge branch 'master' into issue228
Browse files Browse the repository at this point in the history
  • Loading branch information
djmitche committed Feb 2, 2022
2 parents 0c23eb1 + 5d8e7f0 commit dc24dd3
Show file tree
Hide file tree
Showing 23 changed files with 645 additions and 144 deletions.
20 changes: 17 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,26 @@
[//]: # (comment: Don't forget to update lib/datadog/statsd/version.rb:DogStatsd::Statsd::VERSION when releasing a new version)

* [OTHER] Ruby versions earlier than 2.1.0 are no longer supported. Ruby-2.0 was EOL as of 2016-02-24.
* [ENHANCEMENT] The client can now be configured to use UDS via the `DD_DOGSTATSD_SOCKET` environment variable.

## 5.3.3 / 2022.02.02

* [IMPROVEMENT] Add option "buffer_flush_interval" to flush buffered metrics [#231][] by [@abicky][]

* [IMPROVEMENT] Add Sender.queue_size limits to limit number of buffered metrics [#232][] by [@djmitche][]

* [IMPROVEMENT] The client can now be configured to use UDS via the `DD_DOGSTATSD_SOCKET` environment variable.
>>>>>>> master
This variable does not take precedence over any explicit parameters passed to the Statsd constructor.
[#227][] by [@djmitche][]


## 5.3.2 / 2021.11.03

* [OTHER] add a warning message for the v5.x update on install #222 by @djmitche
* [OTHER] add a warning message for the v5.x update on install [#222][] by [@djmitche][]

## 5.3.1 / 2021.10.21

* [OTHER] restore connection opening behavior from before 5.3.0 (connections not opened on client instantiation but on the first write instead) [#214]][] by [@remeh][]
* [OTHER] restore connection opening behavior from before 5.3.0 (connections not opened on client instantiation but on the first write instead) [#214][] by [@remeh][]

## 5.3.0 / 2021.10.06

Expand Down Expand Up @@ -395,6 +405,9 @@ Future versions are likely to introduce backward incompatibilities with < Ruby 1
[#194]: https://github.com/DataDog/dogstatsd-ruby/issues/194
[#205]: https://github.com/DataDog/dogstatsd-ruby/issues/205
[#214]: https://github.com/DataDog/dogstatsd-ruby/issues/214
[#222]: https://github.com/DataDog/dogstatsd-ruby/issues/222
[#231]: https://github.com/DataDog/dogstatsd-ruby/issues/231
[#232]: https://github.com/DataDog/dogstatsd-ruby/issues/232
[@AMekss]: https://github.com/AMekss
[@abicky]: https://github.com/abicky
[@adimitrov]: https://github.com/adimitrov
Expand All @@ -403,6 +416,7 @@ Future versions are likely to introduce backward incompatibilities with < Ruby 1
[@claytono]: https://github.com/claytono
[@degemer]: https://github.com/degemer
[@devleoper]: https://github.com/devleoper
[@djmitche]: https://github.com/djmitche
[@djpate]: https://github.com/djpate
[@f3ndot]: https://github.com/f3ndot
[@fimmtiu]: https://github.com/fimmtiu
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ There are three different kinds of messages:
There is also an implicit message which closes the queue which will cause the sender thread to finish processing and exit.
```ruby
statsd = Datadog::Statsd.new('localhost', 8125)
```

The message queue's maximum size (in messages) is given by the `sender_queue_size` argument, and has appropriate defaults for UDP (2048) and UDS (512).

The `buffer_flush_interval`, if enabled, is implemented with an additional thread which manages the timing of those flushes. This additional thread is used even if `single_thread: true`.

### Usual workflow

You push metrics to the statsd client which writes them quickly to the sender message queue. The sender thread receives those message, buffers them and flushes them to the connection when the buffer limit is reached.
Expand Down
14 changes: 14 additions & 0 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
require_relative 'statsd/sender'
require_relative 'statsd/single_thread_sender'
require_relative 'statsd/forwarder'
require_relative 'statsd/timer'

# = Datadog::Statsd: A DogStatsd client (https://www.datadoghq.com)
#
Expand Down Expand Up @@ -41,7 +42,12 @@ class Error < StandardError
UDP_DEFAULT_BUFFER_SIZE = 1_432
UDS_DEFAULT_BUFFER_SIZE = 8_192
DEFAULT_BUFFER_POOL_SIZE = Float::INFINITY

UDP_DEFAULT_SENDER_QUEUE_SIZE = 2048
UDS_DEFAULT_SENDER_QUEUE_SIZE = 512

MAX_EVENT_SIZE = 8 * 1_024

# minimum flush interval for the telemetry in seconds
DEFAULT_TELEMETRY_FLUSH_INTERVAL = 10

Expand Down Expand Up @@ -70,6 +76,8 @@ def tags
# @option [Logger] logger for debugging
# @option [Integer] buffer_max_payload_size max bytes to buffer
# @option [Integer] buffer_max_pool_size max messages to buffer
# @option [Integer] sender_queue_size size of the sender queue in number of buffers (multi-thread only)
# @option [Numeric] buffer_flush_interval interval in second to flush buffer
# @option [String] socket_path unix socket path
# @option [Float] default sample rate if not overridden
# @option [Boolean] single_thread flushes the metrics on the main thread instead of in a companion thread
Expand All @@ -85,6 +93,9 @@ def initialize(
buffer_max_payload_size: nil,
buffer_max_pool_size: nil,
buffer_overflowing_stategy: :drop,
buffer_flush_interval: nil,

sender_queue_size: nil,

logger: nil,

Expand Down Expand Up @@ -117,6 +128,9 @@ def initialize(
buffer_max_payload_size: buffer_max_payload_size,
buffer_max_pool_size: buffer_max_pool_size,
buffer_overflowing_stategy: buffer_overflowing_stategy,
buffer_flush_interval: buffer_flush_interval,

sender_queue_size: sender_queue_size,

telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil,
)
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/statsd/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def write(payload)
end
end

telemetry.dropped(packets: 1, bytes: payload.length) if telemetry
telemetry.dropped_writer(packets: 1, bytes: payload.length) if telemetry
logger.error { "Statsd: #{boom.class} #{boom}" } if logger
nil
end
Expand Down
19 changes: 18 additions & 1 deletion lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ def initialize(
buffer_max_payload_size: nil,
buffer_max_pool_size: nil,
buffer_overflowing_stategy: :drop,
buffer_flush_interval: nil,

sender_queue_size: nil,

telemetry_flush_interval: nil,
global_tags: [],
Expand Down Expand Up @@ -48,7 +51,21 @@ def initialize(
max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE,
overflowing_stategy: buffer_overflowing_stategy,
)
@sender = (single_thread ? SingleThreadSender : Sender).new(buffer, logger: logger)

sender_queue_size ||= (@transport_type == :udp ?
UDP_DEFAULT_SENDER_QUEUE_SIZE : UDS_DEFAULT_SENDER_QUEUE_SIZE)

@sender = single_thread ?
SingleThreadSender.new(
buffer,
logger: logger,
flush_interval: buffer_flush_interval) :
Sender.new(
buffer,
logger: logger,
flush_interval: buffer_flush_interval,
telemetry: @telemetry,
queue_size: sender_queue_size)
@sender.start
end

Expand Down
29 changes: 22 additions & 7 deletions lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ class Statsd
class Sender
CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close)

def initialize(message_buffer, logger: nil)
def initialize(message_buffer, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SIZE, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread)
@message_buffer = message_buffer
@telemetry = telemetry
@queue_size = queue_size
@logger = logger
@mx = Mutex.new
@queue_class = queue_class
@thread_class = thread_class
if flush_interval
@flush_timer = Datadog::Statsd::Timer.new(flush_interval) { flush(sync: true) }
end
end

def flush(sync: false)
Expand All @@ -42,7 +49,7 @@ def rendez_vous
return unless message_queue

# Initialize and get the thread's sync queue
queue = (Thread.current[:statsd_sync_queue] ||= Queue.new)
queue = (@thread_class.current[:statsd_sync_queue] ||= @queue_class.new)
# tell sender-thread to notify us in the current
# thread's queue
message_queue.push(queue)
Expand All @@ -68,20 +75,26 @@ def add(message)
@message_queue = nil
message_buffer.reset
start
@flush_timer.start if @flush_timer && @flush_timer.stop?
}
end

message_queue << message
if message_queue.length <= @queue_size
message_queue << message
else
@telemetry.dropped_queue(packets: 1, bytes: message.bytesize) if @telemetry
end
end

def start
raise ArgumentError, 'Sender already started' if message_queue

# initialize a new message queue for the background thread
@message_queue = Queue.new
@message_queue = @queue_class.new
# start background thread
@sender_thread = Thread.new(&method(:send_loop))
@sender_thread = @thread_class.new(&method(:send_loop))
@sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')
@flush_timer.start if @flush_timer
end

if CLOSEABLE_QUEUES
Expand All @@ -92,6 +105,7 @@ def stop(join_worker: true)
message_queue = @message_queue
message_queue.close if message_queue

@flush_timer.stop if @flush_timer
sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
end
Expand All @@ -103,6 +117,7 @@ def stop(join_worker: true)
message_queue = @message_queue
message_queue << :close if message_queue

@flush_timer.stop if @flush_timer
sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
end
Expand All @@ -124,7 +139,7 @@ def send_loop
case message
when :flush
message_buffer.flush
when Queue
when @queue_class
message.push(:go_on)
else
message_buffer.add(message)
Expand All @@ -146,7 +161,7 @@ def send_loop
break
when :flush
message_buffer.flush
when Queue
when @queue_class
message.push(:go_on)
else
message_buffer.add(message)
Expand Down
10 changes: 7 additions & 3 deletions lib/datadog/statsd/single_thread_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ class Statsd
# It is using current Process.PID to check it is the result of a recent fork
# and it is reseting the MessageBuffer if that's the case.
class SingleThreadSender
def initialize(message_buffer, logger: nil)
def initialize(message_buffer, logger: nil, flush_interval: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
if flush_interval
@flush_timer = Datadog::Statsd::Timer.new(flush_interval) { flush }
end
# store the pid for which this sender has been created
update_fork_pid
end
Expand All @@ -21,6 +24,7 @@ def add(message)
# not send, they belong to the parent process, let's clear the buffer.
if forked?
@message_buffer.reset
@flush_timer.start if @flush_timer && @flush_timer.stop?
update_fork_pid
end
@message_buffer.add(message)
Expand All @@ -33,12 +37,12 @@ def flush(*)
}
end

# Compatibility with `Sender`
def start()
@flush_timer.start if @flush_timer
end

# Compatibility with `Sender`
def stop()
@flush_timer.stop if @flush_timer
end

# Compatibility with `Sender`
Expand Down
23 changes: 22 additions & 1 deletion lib/datadog/statsd/telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ class Telemetry
attr_reader :service_checks
attr_reader :bytes_sent
attr_reader :bytes_dropped
attr_reader :bytes_dropped_queue
attr_reader :bytes_dropped_writer
attr_reader :packets_sent
attr_reader :packets_dropped
attr_reader :packets_dropped_queue
attr_reader :packets_dropped_writer

# Rough estimation of maximum telemetry message size without tags
MAX_TELEMETRY_MESSAGE_SIZE_WT_TAGS = 50 # bytes
Expand Down Expand Up @@ -40,8 +44,12 @@ def reset
@service_checks = 0
@bytes_sent = 0
@bytes_dropped = 0
@bytes_dropped_queue = 0
@bytes_dropped_writer = 0
@packets_sent = 0
@packets_dropped = 0
@packets_dropped_queue = 0
@packets_dropped_writer = 0
@next_flush_time = now_in_s + @flush_interval
end

Expand All @@ -54,9 +62,18 @@ def sent(metrics: 0, events: 0, service_checks: 0, bytes: 0, packets: 0)
@packets_sent += packets
end

def dropped(bytes: 0, packets: 0)
def dropped_queue(bytes: 0, packets: 0)
@bytes_dropped += bytes
@bytes_dropped_queue += bytes
@packets_dropped += packets
@packets_dropped_queue += packets
end

def dropped_writer(bytes: 0, packets: 0)
@bytes_dropped += bytes
@bytes_dropped_writer += bytes
@packets_dropped += packets
@packets_dropped_writer += packets
end

def should_flush?
Expand All @@ -70,8 +87,12 @@ def flush
sprintf(pattern, 'service_checks', @service_checks),
sprintf(pattern, 'bytes_sent', @bytes_sent),
sprintf(pattern, 'bytes_dropped', @bytes_dropped),
sprintf(pattern, 'bytes_dropped_queue', @bytes_dropped_queue),
sprintf(pattern, 'bytes_dropped_writer', @bytes_dropped_writer),
sprintf(pattern, 'packets_sent', @packets_sent),
sprintf(pattern, 'packets_dropped', @packets_dropped),
sprintf(pattern, 'packets_dropped_queue', @packets_dropped_queue),
sprintf(pattern, 'packets_dropped_writer', @packets_dropped_writer),
]
end

Expand Down
Loading

0 comments on commit dc24dd3

Please sign in to comment.