diff --git a/lib/datadog/statsd/forwarder.rb b/lib/datadog/statsd/forwarder.rb index a7d9ad73..07b65450 100644 --- a/lib/datadog/statsd/forwarder.rb +++ b/lib/datadog/statsd/forwarder.rb @@ -55,7 +55,7 @@ def initialize( overflowing_stategy: buffer_overflowing_stategy, ) - @sender = single_thread ? SingleThreadSender.new(buffer) : Sender.new(buffer) + @sender = single_thread ? SingleThreadSender.new(buffer) : Sender.new(buffer, logger: logger) @sender.start end diff --git a/lib/datadog/statsd/sender.rb b/lib/datadog/statsd/sender.rb index 0ebee5f3..a66f08ac 100644 --- a/lib/datadog/statsd/sender.rb +++ b/lib/datadog/statsd/sender.rb @@ -12,13 +12,20 @@ class Statsd class Sender CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close) - def initialize(message_buffer) + def initialize(message_buffer, logger: nil) @message_buffer = message_buffer + @logger = logger end def flush(sync: false) - # don't try to flush if there is no message_queue instantiated - return unless message_queue + # don't try to flush if there is no message_queue instantiated or + # no companion thread running + if !message_queue || !sender_thread.alive? + @logger.warn { "Statsd: can't flush: no message queue or sender_thread alive" } if @logger + return + end + + return unless message_queue && sender_thread.alive? message_queue.push(:flush) @@ -39,11 +46,14 @@ def rendez_vous def add(message) raise ArgumentError, 'Start sender first' unless message_queue - # if the thread does not exist, we are probably running in a forked process, - # empty the message queue (these messages belong to parent process) and spawn - # a new companion thread. + # if the thread does not exist, we assume we are running in a forked process, + # empty the message queue and message buffers (these messages belong to + # the parent process) and spawn a new companion thread. if !sender_thread.alive? + @logger.warn { "Statsd: companion thread dead, re-creating one" } if @logger + message_queue.close if CLOSEABLE_QUEUES @message_queue = nil + message_buffer.reset start end @@ -53,7 +63,7 @@ def add(message) def start raise ArgumentError, 'Sender already started' if message_queue - # initialize message queue for background thread + # initialize a new message queue for the background thread @message_queue = Queue.new # start background thread @sender_thread = Thread.new(&method(:send_loop)) diff --git a/spec/statsd/forwarder_spec.rb b/spec/statsd/forwarder_spec.rb index 96d553af..08adba63 100644 --- a/spec/statsd/forwarder_spec.rb +++ b/spec/statsd/forwarder_spec.rb @@ -105,7 +105,7 @@ it 'builds the sender' do expect(Datadog::Statsd::Sender) .to receive(:new) - .with(message_buffer) + .with(message_buffer, logger: logger) .exactly(1) subject @@ -283,7 +283,7 @@ it 'builds the sender' do expect(Datadog::Statsd::Sender) .to receive(:new) - .with(message_buffer) + .with(message_buffer, logger: logger) .exactly(1) subject