Skip to content

Commit

Permalink
Merge pull request #205 from DataDog/remeh/fork-detect-v3
Browse files Browse the repository at this point in the history
Fix metrics reporting in applications using `fork`s
  • Loading branch information
remeh authored Sep 28, 2021
2 parents db72b90 + 362acc7 commit 14c76e3
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 20 deletions.
4 changes: 4 additions & 0 deletions lib/datadog/statsd/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ def initialize(telemetry: nil, logger: nil)
@logger = logger
end

def reset_telemetry
telemetry.reset
end

def write(payload)
logger.debug { "Statsd: #{payload}" } if logger

Expand Down
6 changes: 2 additions & 4 deletions lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ def initialize(
raise ArgumentError, "buffer_max_payload_size is not high enough to use telemetry (tags=(#{global_tags.inspect}))"
end

@buffer = MessageBuffer.new(@connection,
buffer = MessageBuffer.new(@connection,
max_payload_size: buffer_max_payload_size,
max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE,
overflowing_stategy: buffer_overflowing_stategy,
)

@sender = single_thread ? SingleThreadSender.new(buffer) : Sender.new(buffer)
@sender = (single_thread ? SingleThreadSender : Sender).new(buffer, logger: logger)
@sender.start
end

Expand Down Expand Up @@ -99,7 +98,6 @@ def close
end

private
attr_reader :buffer
attr_reader :sender
attr_reader :connection

Expand Down
17 changes: 13 additions & 4 deletions lib/datadog/statsd/message_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def initialize(connection,
@overflowing_stategy = overflowing_stategy

@buffer = String.new
@message_count = 0
clear_buffer
end

def add(message)
Expand All @@ -42,16 +42,20 @@ def add(message)
true
end

def reset
clear_buffer
connection.reset_telemetry
end

def flush
return if buffer.empty?

connection.write(buffer)

buffer.clear
@message_count = 0
clear_buffer
end

private

attr :max_payload_size
attr :max_pool_size

Expand All @@ -66,6 +70,11 @@ def should_flush?(message_size)
false
end

def clear_buffer
buffer.clear
@message_count = 0
end

def preemptive_flush?
@message_count == max_pool_size || buffer.bytesize > bytesize_threshold
end
Expand Down
58 changes: 51 additions & 7 deletions lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,45 @@

module Datadog
class Statsd
# Sender is using a companion thread to flush and pack messages
# in a `MessageBuffer`.
# The communication with this thread is done using a `Queue`.
# If the thread is dead, it is starting a new one to avoid having a blocked
# Sender with no companion thread to communicate with (most of the time, having
# a dead companion thread means that a fork just happened and that we are
# running in the child process).
class Sender
CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close)

def initialize(message_buffer)
def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
end

def flush(sync: false)
# don't try to flush if there is no message_queue instantiated
return unless message_queue

message_queue.push(:flush)
# keep a copy around in case another thread is calling #stop while this method is running
current_message_queue = message_queue

# don't try to flush if there is no message_queue instantiated or
# no companion thread running
if !current_message_queue
@logger.debug { "Statsd: can't flush: no message queue ready" } if @logger
return
end
if !sender_thread.alive?
@logger.debug { "Statsd: can't flush: no sender_thread alive" } if @logger
return
end

current_message_queue.push(:flush)
rendez_vous if sync
end

def rendez_vous
# could happen if #start hasn't be called
return unless message_queue

# Initialize and get the thread's sync queue
queue = (Thread.current[:statsd_sync_queue] ||= Queue.new)
# tell sender-thread to notify us in the current
Expand All @@ -32,19 +54,39 @@ def rendez_vous
def add(message)
raise ArgumentError, 'Start sender first' unless message_queue

# if the thread does not exist, we assume we are running in a forked process,
# empty the message queue and message buffers (these messages belong to
# the parent process) and spawn a new companion thread.
if !sender_thread.alive?
@mx.synchronize {
# a call from another thread has already re-created
# the companion thread before this one acquired the lock
break if sender_thread.alive?
@logger.debug { "Statsd: companion thread is dead, re-creating one" } if @logger

message_queue.close if CLOSEABLE_QUEUES
@message_queue = nil
message_buffer.reset
start
}
end

message_queue << message
end

def start
raise ArgumentError, 'Sender already started' if message_queue

# initialize message queue for background thread
# initialize a new message queue for the background thread
@message_queue = Queue.new
# start background thread
@sender_thread = Thread.new(&method(:send_loop))
end

if CLOSEABLE_QUEUES
# when calling stop, make sure that no other threads is trying
# to close the sender nor trying to continue to `#add` more message
# into the sender.
def stop(join_worker: true)
message_queue = @message_queue
message_queue.close if message_queue
Expand All @@ -53,6 +95,9 @@ def stop(join_worker: true)
sender_thread.join if sender_thread && join_worker
end
else
# when calling stop, make sure that no other threads is trying
# to close the sender nor trying to continue to `#add` more message
# into the sender.
def stop(join_worker: true)
message_queue = @message_queue
message_queue << :close if message_queue
Expand All @@ -65,7 +110,6 @@ def stop(join_worker: true)
private

attr_reader :message_buffer

attr_reader :message_queue
attr_reader :sender_thread

Expand Down
37 changes: 34 additions & 3 deletions lib/datadog/statsd/single_thread_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,35 @@

module Datadog
class Statsd
# The SingleThreadSender is a sender synchronously buffering messages
# in a `MessageBuffer`.
# It is using current Process.PID to check it is the result of a recent fork
# and it is reseting the MessageBuffer if that's the case.
class SingleThreadSender
def initialize(message_buffer)
def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
# store the pid for which this sender has been created
update_fork_pid
end

def add(message)
@message_buffer.add(message)
@mx.synchronize {
# we have just forked, meaning we have messages in the buffer that we should
# not send, they belong to the parent process, let's clear the buffer.
if forked?
@message_buffer.reset
update_fork_pid
end
@message_buffer.add(message)
}
end

def flush(*)
@message_buffer.flush()
@mx.synchronize {
@message_buffer.flush()
}
end

# Compatibility with `Sender`
Expand All @@ -26,6 +44,19 @@ def stop()
# Compatibility with `Sender`
def rendez_vous()
end

private

# below are "fork management" methods to be able to clean the MessageBuffer
# if it detects that it is running in a unknown PID.

def forked?
Process.pid != @fork_pid
end

def update_fork_pid
@fork_pid = Process.pid
end
end
end
end
4 changes: 2 additions & 2 deletions spec/statsd/forwarder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down Expand Up @@ -283,7 +283,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down

0 comments on commit 14c76e3

Please sign in to comment.