Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metrics reporting in applications using forks #205

Merged
merged 8 commits into from
Sep 28, 2021
4 changes: 4 additions & 0 deletions lib/datadog/statsd/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ def initialize(telemetry: nil, logger: nil)
@logger = logger
end

def reset_telemetry
telemetry.reset
end

def write(payload)
logger.debug { "Statsd: #{payload}" } if logger

Expand Down
6 changes: 2 additions & 4 deletions lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ def initialize(
raise ArgumentError, "buffer_max_payload_size is not high enough to use telemetry (tags=(#{global_tags.inspect}))"
end

@buffer = MessageBuffer.new(@connection,
buffer = MessageBuffer.new(@connection,
max_payload_size: buffer_max_payload_size,
max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE,
overflowing_stategy: buffer_overflowing_stategy,
)

@sender = single_thread ? SingleThreadSender.new(buffer) : Sender.new(buffer)
@sender = (single_thread ? SingleThreadSender : Sender).new(buffer, logger: logger)
@sender.start
end

Expand Down Expand Up @@ -99,7 +98,6 @@ def close
end

private
attr_reader :buffer
attr_reader :sender
attr_reader :connection

Expand Down
17 changes: 13 additions & 4 deletions lib/datadog/statsd/message_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def initialize(connection,
@overflowing_stategy = overflowing_stategy

@buffer = String.new
@message_count = 0
clear_buffer
end

def add(message)
Expand All @@ -42,16 +42,20 @@ def add(message)
true
end

def reset
clear_buffer
connection.reset_telemetry
end

def flush
return if buffer.empty?

connection.write(buffer)

buffer.clear
@message_count = 0
clear_buffer
end

private

attr :max_payload_size
attr :max_pool_size

Expand All @@ -66,6 +70,11 @@ def should_flush?(message_size)
false
end

def clear_buffer
buffer.clear
@message_count = 0
end

def preemptive_flush?
@message_count == max_pool_size || buffer.bytesize > bytesize_threshold
end
Expand Down
58 changes: 51 additions & 7 deletions lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,45 @@

module Datadog
class Statsd
# Sender is using a companion thread to flush and pack messages
# in a `MessageBuffer`.
# The communication with this thread is done using a `Queue`.
# If the thread is dead, it is starting a new one to avoid having a blocked
# Sender with no companion thread to communicate with (most of the time, having
# a dead companion thread means that a fork just happened and that we are
# running in the child process).
class Sender
CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close)

def initialize(message_buffer)
def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
end

def flush(sync: false)
# don't try to flush if there is no message_queue instantiated
return unless message_queue

message_queue.push(:flush)
# keep a copy around in case another thread is calling #stop while this method is running
current_message_queue = message_queue

# don't try to flush if there is no message_queue instantiated or
# no companion thread running
if !current_message_queue
@logger.debug { "Statsd: can't flush: no message queue ready" } if @logger
return
end
if !sender_thread.alive?
@logger.debug { "Statsd: can't flush: no sender_thread alive" } if @logger
return
end

current_message_queue.push(:flush)
rendez_vous if sync
end

def rendez_vous
# could happen if #start hasn't be called
return unless message_queue
remeh marked this conversation as resolved.
Show resolved Hide resolved

# Initialize and get the thread's sync queue
queue = (Thread.current[:statsd_sync_queue] ||= Queue.new)
# tell sender-thread to notify us in the current
Expand All @@ -32,19 +54,39 @@ def rendez_vous
def add(message)
raise ArgumentError, 'Start sender first' unless message_queue

# if the thread does not exist, we assume we are running in a forked process,
# empty the message queue and message buffers (these messages belong to
# the parent process) and spawn a new companion thread.
if !sender_thread.alive?
@mx.synchronize {
# a call from another thread has already re-created
# the companion thread before this one acquired the lock
break if sender_thread.alive?
@logger.debug { "Statsd: companion thread is dead, re-creating one" } if @logger

message_queue.close if CLOSEABLE_QUEUES
@message_queue = nil
message_buffer.reset
start
}
end

message_queue << message
end

def start
raise ArgumentError, 'Sender already started' if message_queue

# initialize message queue for background thread
# initialize a new message queue for the background thread
@message_queue = Queue.new
# start background thread
@sender_thread = Thread.new(&method(:send_loop))
end

if CLOSEABLE_QUEUES
# when calling stop, make sure that no other threads is trying
# to close the sender nor trying to continue to `#add` more message
# into the sender.
def stop(join_worker: true)
message_queue = @message_queue
message_queue.close if message_queue
Expand All @@ -53,6 +95,9 @@ def stop(join_worker: true)
sender_thread.join if sender_thread && join_worker
end
else
# when calling stop, make sure that no other threads is trying
# to close the sender nor trying to continue to `#add` more message
# into the sender.
def stop(join_worker: true)
message_queue = @message_queue
message_queue << :close if message_queue
Expand All @@ -65,7 +110,6 @@ def stop(join_worker: true)
private

attr_reader :message_buffer

attr_reader :message_queue
attr_reader :sender_thread

Expand Down
37 changes: 34 additions & 3 deletions lib/datadog/statsd/single_thread_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,35 @@

module Datadog
class Statsd
# The SingleThreadSender is a sender synchronously buffering messages
# in a `MessageBuffer`.
# It is using current Process.PID to check it is the result of a recent fork
# and it is reseting the MessageBuffer if that's the case.
class SingleThreadSender
def initialize(message_buffer)
def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
# store the pid for which this sender has been created
update_fork_pid
end

def add(message)
@message_buffer.add(message)
@mx.synchronize {
# we have just forked, meaning we have messages in the buffer that we should
# not send, they belong to the parent process, let's clear the buffer.
if forked?
@message_buffer.reset
update_fork_pid
end
@message_buffer.add(message)
}
end

def flush(*)
@message_buffer.flush()
@mx.synchronize {
@message_buffer.flush()
}
end

# Compatibility with `Sender`
Expand All @@ -26,6 +44,19 @@ def stop()
# Compatibility with `Sender`
def rendez_vous()
end

private

# below are "fork management" methods to be able to clean the MessageBuffer
# if it detects that it is running in a unknown PID.

def forked?
Process.pid != @fork_pid
end

def update_fork_pid
@fork_pid = Process.pid
end
end
end
end
4 changes: 2 additions & 2 deletions spec/statsd/forwarder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down Expand Up @@ -283,7 +283,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down