Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sender] reset buffers on forks and reset the companion thread if dead or nil #203

Closed
wants to merge 10 commits into from
16 changes: 2 additions & 14 deletions lib/datadog/statsd/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,6 @@ def initialize(telemetry: nil, logger: nil)
@logger = logger
end

# Close the underlying socket
def close
begin
@socket && @socket.close if instance_variable_defined?(:@socket)
rescue StandardError => boom
logger.error { "Statsd: #{boom.class} #{boom}" } if logger
end
@socket = nil
end

def write(payload)
logger.debug { "Statsd: #{payload}" } if logger

Expand All @@ -36,6 +26,7 @@ def write(payload)
retries += 1
begin
close
connect
remeh marked this conversation as resolved.
Show resolved Hide resolved
retry
rescue StandardError => e
boom = e
Expand All @@ -48,12 +39,9 @@ def write(payload)
end

private

attr_reader :telemetry
attr_reader :logger

def socket
@socket ||= connect
end
end
end
end
2 changes: 1 addition & 1 deletion lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def initialize(
overflowing_stategy: buffer_overflowing_stategy,
)

@sender = single_thread ? SingleThreadSender.new(buffer) : Sender.new(buffer)
@sender = single_thread ? SingleThreadSender.new(buffer) : Sender.new(buffer, logger: logger)
remeh marked this conversation as resolved.
Show resolved Hide resolved
@sender.start
end

Expand Down
9 changes: 6 additions & 3 deletions lib/datadog/statsd/message_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ def add(message)
true
end

def reset
buffer.clear
@message_count = 0
end

def flush
return if buffer.empty?

connection.write(buffer)

buffer.clear
@message_count = 0
reset
end

private
Expand Down
85 changes: 63 additions & 22 deletions lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,111 @@

module Datadog
class Statsd
# Sender is using a companion thread to flush and pack messages
# in a `MessageBuffer`.
# The communication with this thread is done using a `Queue`.
# If the thread is dead, it is starting a new one to avoid having a blocked
# Sender with no companion thread to communicate with (most of the time, having
# a dead companion thread means that a fork just happened and that we are
# running in the child process).
class Sender
CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close)

def initialize(message_buffer)
def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
end

def flush(sync: false)
# don't try to flush if there is no message_queue instantiated
return unless message_queue
# don't try to flush if there is no message_queue instantiated or
# no companion thread running
if !message_queue
@logger.debug { "Statsd: can't flush: no message queue ready" } if @logger
return
end
if !sender_thread.alive?
@logger.debug { "Statsd: can't flush: no sender_thread alive" } if @logger
return
end

message_queue.push(:flush)
@mx.synchronize {
message_queue.push(:flush)
}

rendez_vous if sync
end

def rendez_vous
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since #rendez_vous is public, should it also perform similar checks to #flush and use the lock as well?

# Initialize and get the thread's sync queue
queue = (Thread.current[:statsd_sync_queue] ||= Queue.new)
# tell sender-thread to notify us in the current
# thread's queue
message_queue.push(queue)
# wait for the sender thread to send a message
# once the flush is done
queue.pop
@mx.synchronize {
# Initialize and get the thread's sync queue
queue = (Thread.current[:statsd_sync_queue] ||= Queue.new)
# tell sender-thread to notify us in the current
# thread's queue
message_queue.push(queue)
# wait for the sender thread to send a message
# once the flush is done
queue.pop
}
end

def add(message)
raise ArgumentError, 'Start sender first' unless message_queue

# if the thread does not exist, we assume we are running in a forked process,
# empty the message queue and message buffers (these messages belong to
# the parent process) and spawn a new companion thread.
if !sender_thread.alive?
@logger.debug { "Statsd: companion thread is dead, re-creating one" } if @logger
@mx.synchronize {
# a call from another thread has already re-created
# the companion thread before this one acquired the lock
break if sender_thread.alive?

message_queue.close if CLOSEABLE_QUEUES
@message_queue = nil
message_buffer.reset
start
}
end

message_queue << message
end

def start
raise ArgumentError, 'Sender already started' if message_queue

# initialize message queue for background thread
# initialize a new message queue for the background thread
@message_queue = Queue.new
# start background thread
@sender_thread = Thread.new(&method(:send_loop))
end

if CLOSEABLE_QUEUES
def stop(join_worker: true)
message_queue = @message_queue
message_queue.close if message_queue
@mx.synchronize {
message_queue = @message_queue
message_queue.close if message_queue

sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
}
end
else
def stop(join_worker: true)
message_queue = @message_queue
message_queue << :close if message_queue
@mx.synchronize {
message_queue = @message_queue
message_queue << :close if message_queue

sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
sender_thread = @sender_thread
sender_thread.join if sender_thread && join_worker
}
end
end
Comment on lines 85 to 105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: These two methods are very similar -- would it be worth unifying them and only doing the if CLOSEABLE_QUEUES in the one line that changes between them?


private

attr_reader :message_buffer

attr_reader :message_queue
attr_reader :sender_thread

Expand Down
37 changes: 34 additions & 3 deletions lib/datadog/statsd/single_thread_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,35 @@

module Datadog
class Statsd
# The SingleThreadSender is a sender synchronously buffering messages
# in a `MessageBuffer`.
# It is using current Process.PID to check it is the result of a recent fork
# and it is reseting the MessageBuffer if that's the case.
class SingleThreadSender
def initialize(message_buffer)
def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
# store the pid for which this sender has been created
update_fork_pid
end

def add(message)
@message_buffer.add(message)
@mx.synchronize {
# we have just forked, meaning we have messages in the buffer that we should
# not send, they belong to the parent process, let's clear the buffer.
if forked?
@message_buffer.reset
update_fork_pid
end
@message_buffer.add(message)
}
end

def flush(*)
@message_buffer.flush()
@mx.synchronize {
@message_buffer.flush()
}
end

# Compatibility with `Sender`
Expand All @@ -26,6 +44,19 @@ def stop()
# Compatibility with `Sender`
def rendez_vous()
end

private

# below are "fork management" methods to be able to clean the MessageBuffer
# if it detects that it is running in a unknown PID.

def forked?
Process.pid != @fork_pid
end

def update_fork_pid
@fork_pid = Process.pid
end
remeh marked this conversation as resolved.
Show resolved Hide resolved
end
end
end
16 changes: 12 additions & 4 deletions lib/datadog/statsd/udp_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@ def initialize(host, port, **kwargs)

@host = host || ENV.fetch('DD_AGENT_HOST', DEFAULT_HOST)
@port = port || ENV.fetch('DD_DOGSTATSD_PORT', DEFAULT_PORT).to_i
@socket = nil
connect
end

def close
@socket.close if @socket
@socket = nil
end

private

def connect
UDPSocket.new.tap do |socket|
socket.connect(host, port)
end
close if @socket

@socket = UDPSocket.new
@socket.connect(host, port)
end

def send_message(message)
socket.send(message, 0)
@socket.send(message, 0)
end
end
end
Expand Down
16 changes: 12 additions & 4 deletions lib/datadog/statsd/uds_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,26 @@ def initialize(socket_path, **kwargs)
super(**kwargs)

@socket_path = socket_path
@socket = nil
connect
end

def close
@socket.close if @socket
@socket = nil
end

private

def connect
socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
socket.connect(Socket.pack_sockaddr_un(@socket_path))
socket
close unless @socket == nil

@socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
@socket.connect(Socket.pack_sockaddr_un(@socket_path))
end

def send_message(message)
socket.sendmsg_nonblock(message)
@socket.sendmsg_nonblock(message)
rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::ENOENT => e
@socket = nil
# TODO: FIXME: This error should be considered as a retryable error in the
Expand Down
4 changes: 2 additions & 2 deletions spec/statsd/forwarder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down Expand Up @@ -283,7 +283,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down
1 change: 1 addition & 0 deletions spec/statsd_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@

before do
allow(Socket).to receive(:new).and_call_original
.with(Socket.pack_sockaddr_un('/tmp/socket')) # fake UDS socket
end

it 'uses an UDS socket' do
Expand Down