Skip to content

Commit

Permalink
[sender] reset the message buffer and log warnings.
Browse files Browse the repository at this point in the history
  • Loading branch information
remeh committed Sep 7, 2021
1 parent 83659d7 commit 72105bf
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
2 changes: 1 addition & 1 deletion lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def initialize(
overflowing_stategy: buffer_overflowing_stategy,
)

@sender = single_thread ? SingleThreadSender.new(buffer) : Sender.new(buffer)
@sender = single_thread ? SingleThreadSender.new(buffer) : Sender.new(buffer, logger: logger)
@sender.start
end

Expand Down
24 changes: 17 additions & 7 deletions lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@ class Statsd
class Sender
CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close)

def initialize(message_buffer)
def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
end

def flush(sync: false)
# don't try to flush if there is no message_queue instantiated
return unless message_queue
# don't try to flush if there is no message_queue instantiated or
# no companion thread running
if !message_queue || !sender_thread.alive?
@logger.warn { "Statsd: can't flush: no message queue or sender_thread alive" } if @logger
return
end

return unless message_queue && sender_thread.alive?

message_queue.push(:flush)

Expand All @@ -39,11 +46,14 @@ def rendez_vous
def add(message)
raise ArgumentError, 'Start sender first' unless message_queue

# if the thread does not exist, we are probably running in a forked process,
# empty the message queue (these messages belong to parent process) and spawn
# a new companion thread.
# if the thread does not exist, we assume we are running in a forked process,
# empty the message queue and message buffers (these messages belong to
# the parent process) and spawn a new companion thread.
if !sender_thread.alive?
@logger.warn { "Statsd: companion thread dead, re-creating one" } if @logger
message_queue.close if CLOSEABLE_QUEUES
@message_queue = nil
message_buffer.reset
start
end

Expand All @@ -53,7 +63,7 @@ def add(message)
def start
raise ArgumentError, 'Sender already started' if message_queue

# initialize message queue for background thread
# initialize a new message queue for the background thread
@message_queue = Queue.new
# start background thread
@sender_thread = Thread.new(&method(:send_loop))
Expand Down
4 changes: 2 additions & 2 deletions spec/statsd/forwarder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down Expand Up @@ -283,7 +283,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down

0 comments on commit 72105bf

Please sign in to comment.