Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metrics reporting in applications using forks #205

Merged
merged 8 commits into from
Sep 28, 2021
2 changes: 1 addition & 1 deletion lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def initialize(
overflowing_stategy: buffer_overflowing_stategy,
)

@sender = single_thread ? SingleThreadSender.new(buffer) : Sender.new(buffer)
@sender = single_thread ? SingleThreadSender.new(buffer) : Sender.new(buffer, logger: logger)
remeh marked this conversation as resolved.
Show resolved Hide resolved
@sender.start
end

Expand Down
9 changes: 6 additions & 3 deletions lib/datadog/statsd/message_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ def add(message)
true
end

def reset
buffer.clear
@message_count = 0
remeh marked this conversation as resolved.
Show resolved Hide resolved
end

def flush
return if buffer.empty?

connection.write(buffer)

buffer.clear
@message_count = 0
reset
end

private
Expand Down
85 changes: 63 additions & 22 deletions lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,111 @@

module Datadog
class Statsd
# Sender is using a companion thread to flush and pack messages
# in a `MessageBuffer`.
# The communication with this thread is done using a `Queue`.
# If the thread is dead, it is starting a new one to avoid having a blocked
# Sender with no companion thread to communicate with (most of the time, having
# a dead companion thread means that a fork just happened and that we are
# running in the child process).
class Sender
CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close)

def initialize(message_buffer)
def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
end

def flush(sync: false)
# don't try to flush if there is no message_queue instantiated
return unless message_queue
# don't try to flush if there is no message_queue instantiated or
# no companion thread running
if !message_queue
@logger.debug { "Statsd: can't flush: no message queue ready" } if @logger
return
end
if !sender_thread.alive?
@logger.debug { "Statsd: can't flush: no sender_thread alive" } if @logger
return
end

message_queue.push(:flush)
@mx.synchronize {
remeh marked this conversation as resolved.
Show resolved Hide resolved
message_queue.push(:flush)
remeh marked this conversation as resolved.
Show resolved Hide resolved
}

rendez_vous if sync
end

def rendez_vous
# Initialize and get the thread's sync queue
queue = (Thread.current[:statsd_sync_queue] ||= Queue.new)
# tell sender-thread to notify us in the current
# thread's queue
message_queue.push(queue)
# wait for the sender thread to send a message
# once the flush is done
queue.pop
@mx.synchronize {
# Initialize and get the thread's sync queue
queue = (Thread.current[:statsd_sync_queue] ||= Queue.new)
# tell sender-thread to notify us in the current
# thread's queue
message_queue.push(queue)
# wait for the sender thread to send a message
# once the flush is done
queue.pop
}
end
remeh marked this conversation as resolved.
Show resolved Hide resolved

def add(message)
raise ArgumentError, 'Start sender first' unless message_queue

# if the thread does not exist, we assume we are running in a forked process,
# empty the message queue and message buffers (these messages belong to
# the parent process) and spawn a new companion thread.
if !sender_thread.alive?
@logger.debug { "Statsd: companion thread is dead, re-creating one" } if @logger
@mx.synchronize {
# a call from another thread has already re-created
# the companion thread before this one acquired the lock
break if sender_thread.alive?
remeh marked this conversation as resolved.
Show resolved Hide resolved

message_queue.close if CLOSEABLE_QUEUES
@message_queue = nil
message_buffer.reset
start
}
end

message_queue << message
end

def start
raise ArgumentError, 'Sender already started' if message_queue

# initialize message queue for background thread
# initialize a new message queue for the background thread
@message_queue = Queue.new
# start background thread
@sender_thread = Thread.new(&method(:send_loop))
end

if CLOSEABLE_QUEUES
def stop(join_worker: true)
message_queue = @message_queue
message_queue.close if message_queue
@mx.synchronize {
message_queue = @message_queue
message_queue.close if message_queue

sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
}
end
else
def stop(join_worker: true)
message_queue = @message_queue
message_queue << :close if message_queue
@mx.synchronize {
message_queue = @message_queue
message_queue << :close if message_queue

sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
}
end
end
remeh marked this conversation as resolved.
Show resolved Hide resolved

private

attr_reader :message_buffer

attr_reader :message_queue
attr_reader :sender_thread

Expand Down
37 changes: 34 additions & 3 deletions lib/datadog/statsd/single_thread_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,35 @@

module Datadog
class Statsd
# The SingleThreadSender is a sender synchronously buffering messages
# in a `MessageBuffer`.
# It is using current Process.PID to check it is the result of a recent fork
# and it is reseting the MessageBuffer if that's the case.
class SingleThreadSender
def initialize(message_buffer)
def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
# store the pid for which this sender has been created
update_fork_pid
end

def add(message)
@message_buffer.add(message)
@mx.synchronize {
# we have just forked, meaning we have messages in the buffer that we should
# not send, they belong to the parent process, let's clear the buffer.
if forked?
@message_buffer.reset
update_fork_pid
end
@message_buffer.add(message)
}
end

def flush(*)
@message_buffer.flush()
@mx.synchronize {
@message_buffer.flush()
}
end

# Compatibility with `Sender`
Expand All @@ -26,6 +44,19 @@ def stop()
# Compatibility with `Sender`
def rendez_vous()
end

private

# below are "fork management" methods to be able to clean the MessageBuffer
# if it detects that it is running in a unknown PID.

def forked?
Process.pid != @fork_pid
end

def update_fork_pid
@fork_pid = Process.pid
end
end
end
end
4 changes: 2 additions & 2 deletions spec/statsd/forwarder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down Expand Up @@ -283,7 +283,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down