Skip to content

Commit

Permalink
In the MessageBuffer, detect if we've just been called by a fork ch…
Browse files Browse the repository at this point in the history
…ild.
  • Loading branch information
remeh committed Jul 13, 2021
1 parent ef41904 commit 7ffba49
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
7 changes: 7 additions & 0 deletions lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ def initialize(
)
@transport_type = socket_path.nil? ? :udp : :uds

# in order to add the telemetry metrics only in the main process
# note that it also means that if the main process is not submitting metrics,
# we won't send any telemetry data
@main_pid = Process.pid

if telemetry_flush_interval
@telemetry = Telemetry.new(telemetry_flush_interval,
global_tags: global_tags,
Expand Down Expand Up @@ -104,6 +109,8 @@ def close
attr_reader :connection

def do_flush_telemetry
return if Process.pid != @main_pid

telemetry_snapshot = telemetry.flush
telemetry.reset

Expand Down
29 changes: 29 additions & 0 deletions lib/datadog/statsd/message_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
module Datadog
class Statsd
class MessageBuffer
@fork_pid = 0

PAYLOAD_SIZE_TOLERANCE = 0.05

def initialize(connection,
Expand All @@ -20,9 +22,21 @@ def initialize(connection,

@buffer = String.new
@message_count = 0

# store the pid for which this message buffer has been created
update_fork_pid
end

def add(message)
# we are in a new PID, which means the parent process has just forked and
# we are currently running in the child: we have to clean the buffer since
# we don't want to process/flush the metrics buffered by the parent process.
if forked?
buffer.clear
@message_count = 0
update_fork_pid
end

message_size = message.bytesize

return nil unless message_size > 0 # to avoid adding empty messages to the buffer
Expand Down Expand Up @@ -83,6 +97,21 @@ def ensure_sendable!(message_size)
def bytesize_threshold
@bytesize_threshold ||= (max_payload_size - PAYLOAD_SIZE_TOLERANCE * max_payload_size).to_i
end

# below are "fork management" methods to be able to clean the MessageBuffer
# if it detects that it is running in a unknown PID.

def forked?
Process.pid != fork_pid
end

def update_fork_pid
@fork_pid = Process.pid
end

def fork_pid
@fork_pid ||= Process.pid
end
end
end
end

0 comments on commit 7ffba49

Please sign in to comment.