diff --git a/lib/datadog/statsd/forwarder.rb b/lib/datadog/statsd/forwarder.rb index a7d9ad73..0c8712e8 100644 --- a/lib/datadog/statsd/forwarder.rb +++ b/lib/datadog/statsd/forwarder.rb @@ -24,6 +24,11 @@ def initialize( ) @transport_type = socket_path.nil? ? :udp : :uds + # in order to add the telemetry metrics only in the main process + # note that it also means that if the main process is not submitting metrics, + # we won't send any telemetry data + @main_pid = Process.pid + if telemetry_flush_interval @telemetry = Telemetry.new(telemetry_flush_interval, global_tags: global_tags, @@ -104,6 +109,8 @@ def close attr_reader :connection def do_flush_telemetry + return if Process.pid != @main_pid + telemetry_snapshot = telemetry.flush telemetry.reset diff --git a/lib/datadog/statsd/message_buffer.rb b/lib/datadog/statsd/message_buffer.rb index 841862f7..374176cf 100644 --- a/lib/datadog/statsd/message_buffer.rb +++ b/lib/datadog/statsd/message_buffer.rb @@ -3,6 +3,8 @@ module Datadog class Statsd class MessageBuffer + @fork_pid = 0 + PAYLOAD_SIZE_TOLERANCE = 0.05 def initialize(connection, @@ -20,9 +22,21 @@ def initialize(connection, @buffer = String.new @message_count = 0 + + # store the pid for which this message buffer has been created + update_fork_pid end def add(message) + # we are in a new PID, which means the parent process has just forked and + # we are currently running in the child: we have to clean the buffer since + # we don't want to process/flush the metrics buffered by the parent process. + if forked? + buffer.clear + @message_count = 0 + update_fork_pid + end + message_size = message.bytesize return nil unless message_size > 0 # to avoid adding empty messages to the buffer @@ -83,6 +97,21 @@ def ensure_sendable!(message_size) def bytesize_threshold @bytesize_threshold ||= (max_payload_size - PAYLOAD_SIZE_TOLERANCE * max_payload_size).to_i end + + # below are "fork management" methods to be able to clean the MessageBuffer + # if it detects that it is running in a unknown PID. + + def forked? + Process.pid != fork_pid + end + + def update_fork_pid + @fork_pid = Process.pid + end + + def fork_pid + @fork_pid ||= Process.pid + end end end end