Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metrics reporting in applications using forks #205

Merged
merged 8 commits into from
Sep 28, 2021
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ In order to use DogStatsD metrics, events, and Service Checks the Agent must be

After the client is created, you can start sending custom metrics to Datadog. See the dedicated [Metric Submission: DogStatsD documentation](https://docs.datadoghq.com/developers/metrics/dogstatsd_metrics_submission/?tab=ruby) to see how to submit all supported metric types to Datadog with working code examples:

* [Submit a COUNT metric](https://docs.datadoghq.com/developers/metrics/dogstatsd_metrics_submission/?tab=ruby#count).
* [Submit a GAUGE metric](https://docs.datadoghq.com/developers/metrics/dogstatsd_metrics_submission/?tab=ruby#gauge).
* [Submit a SET metric](https://docs.datadoghq.com/developers/metrics/dogstatsd_metrics_submission/?tab=ruby#set)
* [Submit a HISTOGRAM metric](https://docs.datadoghq.com/developers/metrics/dogstatsd_metrics_submission/?tab=ruby#histogram)
* [Submit a DISTRIBUTION metric](https://docs.datadoghq.com/developers/metrics/dogstatsd_metrics_submission/?tab=ruby#distribution)
* [Submit a COUNT metric](https://docs.datadoghq.com/metrics/dogstatsd_metrics_submission/?tab=ruby#count).
* [Submit a GAUGE metric](https://docs.datadoghq.com/metrics/dogstatsd_metrics_submission/?tab=ruby#gauge).
* [Submit a SET metric](https://docs.datadoghq.com/metrics/dogstatsd_metrics_submission/?tab=ruby#set)
* [Submit a HISTOGRAM metric](https://docs.datadoghq.com/metrics/dogstatsd_metrics_submission/?tab=ruby#histogram)
* [Submit a DISTRIBUTION metric](https://docs.datadoghq.com/metrics/dogstatsd_metrics_submission/?tab=ruby#distribution)

Some options are suppported when submitting metrics, like [applying a Sample Rate to your metrics](https://docs.datadoghq.com/developers/metrics/dogstatsd_metrics_submission/?tab=ruby#metric-submission-options) or [tagging your metrics with your custom tags](https://docs.datadoghq.com/developers/metrics/dogstatsd_metrics_submission/?tab=ruby#metric-tagging). Find all the available functions to report metrics in the [DogStatsD-ruby rubydoc](https://www.rubydoc.info/github/DataDog/dogstatsd-ruby/master/Datadog/Statsd).
Some options are suppported when submitting metrics, like [applying a Sample Rate to your metrics](https://docs.datadoghq.com/metrics/dogstatsd_metrics_submission/?tab=ruby#metric-submission-options) or [tagging your metrics with your custom tags](https://docs.datadoghq.com/developers/metrics/dogstatsd_metrics_submission/?tab=ruby#metric-tagging). Find all the available functions to report metrics in the [DogStatsD-ruby rubydoc](https://www.rubydoc.info/github/DataDog/dogstatsd-ruby/master/Datadog/Statsd).
remeh marked this conversation as resolved.
Show resolved Hide resolved

### Events

Expand Down
7 changes: 7 additions & 0 deletions lib/datadog/statsd/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@ def initialize(telemetry: nil, logger: nil)
@logger = logger
end

def reset_telemetry
telemetry.reset
end

# Close the underlying socket
def close
# NOTE(remy): we do not want to automatically reset the telemetry object
# here because the retry mechanism may automatically re-create the connection
# in this case, we want to keep the data for the telemetry
begin
@socket && @socket.close if instance_variable_defined?(:@socket)
rescue StandardError => boom
Expand Down
6 changes: 2 additions & 4 deletions lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ def initialize(
raise ArgumentError, "buffer_max_payload_size is not high enough to use telemetry (tags=(#{global_tags.inspect}))"
end

@buffer = MessageBuffer.new(@connection,
buffer = MessageBuffer.new(@connection,
max_payload_size: buffer_max_payload_size,
max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE,
overflowing_stategy: buffer_overflowing_stategy,
)

@sender = single_thread ? SingleThreadSender.new(buffer) : Sender.new(buffer)
@sender = (single_thread ? SingleThreadSender : Sender).new(buffer, logger: logger)
@sender.start
end

Expand Down Expand Up @@ -99,7 +98,6 @@ def close
end

private
attr_reader :buffer
attr_reader :sender
attr_reader :connection

Expand Down
16 changes: 12 additions & 4 deletions lib/datadog/statsd/message_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def initialize(connection,
@overflowing_stategy = overflowing_stategy

@buffer = String.new
@message_count = 0
reset
end

def add(message)
Expand All @@ -42,16 +42,24 @@ def add(message)
true
end

def reset
buffer.clear
@message_count = 0
remeh marked this conversation as resolved.
Show resolved Hide resolved
end

def reset_telemetry
connection.reset_telemetry
end
remeh marked this conversation as resolved.
Show resolved Hide resolved

def flush
return if buffer.empty?

connection.write(buffer)

buffer.clear
@message_count = 0
reset
end

private

attr :max_payload_size
attr :max_pool_size

Expand Down
53 changes: 46 additions & 7 deletions lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,45 @@

module Datadog
class Statsd
# Sender is using a companion thread to flush and pack messages
# in a `MessageBuffer`.
# The communication with this thread is done using a `Queue`.
# If the thread is dead, it is starting a new one to avoid having a blocked
# Sender with no companion thread to communicate with (most of the time, having
# a dead companion thread means that a fork just happened and that we are
# running in the child process).
class Sender
CLOSEABLE_QUEUES = Queue.instance_methods.include?(:close)

def initialize(message_buffer)
def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
end

def flush(sync: false)
# don't try to flush if there is no message_queue instantiated
return unless message_queue

message_queue.push(:flush)
# keep a copy around in case another thread is calling #stop while this method is running
current_message_queue = message_queue

# don't try to flush if there is no message_queue instantiated or
# no companion thread running
if !current_message_queue
@logger.debug { "Statsd: can't flush: no message queue ready" } if @logger
return
end
if !sender_thread.alive?
@logger.debug { "Statsd: can't flush: no sender_thread alive" } if @logger
return
end

current_message_queue.push(:flush)
rendez_vous if sync
end

def rendez_vous
# could happen if #start hasn't be called
return unless message_queue
remeh marked this conversation as resolved.
Show resolved Hide resolved

# Initialize and get the thread's sync queue
queue = (Thread.current[:statsd_sync_queue] ||= Queue.new)
# tell sender-thread to notify us in the current
Expand All @@ -32,13 +54,31 @@ def rendez_vous
def add(message)
raise ArgumentError, 'Start sender first' unless message_queue

# if the thread does not exist, we assume we are running in a forked process,
# empty the message queue and message buffers (these messages belong to
# the parent process) and spawn a new companion thread.
if !sender_thread.alive?
@mx.synchronize {
# a call from another thread has already re-created
# the companion thread before this one acquired the lock
break if sender_thread.alive?
@logger.debug { "Statsd: companion thread is dead, re-creating one" } if @logger

message_queue.close if CLOSEABLE_QUEUES
@message_queue = nil
message_buffer.reset
message_buffer.reset_telemetry
start
}
end

message_queue << message
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Hmm I still see a potential issue here, similar to the ones in #flush and #rendez_vous: with some poor timing of stop, by the time we get to the message_queue << message line, message_queue may be nil.

(And its cousin issue, @sender_thread being nil)

I think part of the issue is that we have the background thread setting these two things to nil without ever synchronizing with any other threads, which can get surprised by this at many points in their execution. While we could expand the synchronization even more, that seems to me to be a bit heavy-handed, especially since we have a thread-safe construct (Queue) that we're building around in this class.

Here's my suggestion:

  1. Construct the Queue in #initialize
  2. Never set it to nil or close it (but we may #clear it when restarting the background thread or after a stop). This enables us to always know that we can safely use it and call methods on it.
  3. Only synchronize when mutating @sender_thread -> starting it, changing it (when it dies), or setting it to nil (when it finishes due to #stop). Reading @sender_thread for checks is OK to do without locks.

This is just a suggestion, so feel free to ignore and do something else. But I think this class as it is, is hiding a lot of complexity introduced by the shared mutable state, truly the more I look the more I see potential issues.

Let me know if you'd like to pair on this; perhaps that way we can get this across the finish line without so much async back-and-forth and rework.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is why at first I've used the mutex to synchronize the close as well, it would avoid timing with the #stop call since the #stop would be able to run only if it has the lock. I'll see what change represents your suggestion, and I agree that this class ships a lot of complexity now...


def start
raise ArgumentError, 'Sender already started' if message_queue

# initialize message queue for background thread
# initialize a new message queue for the background thread
@message_queue = Queue.new
# start background thread
@sender_thread = Thread.new(&method(:send_loop))
Expand All @@ -65,7 +105,6 @@ def stop(join_worker: true)
private

attr_reader :message_buffer

attr_reader :message_queue
attr_reader :sender_thread

Expand Down
38 changes: 35 additions & 3 deletions lib/datadog/statsd/single_thread_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,36 @@

module Datadog
class Statsd
# The SingleThreadSender is a sender synchronously buffering messages
# in a `MessageBuffer`.
# It is using current Process.PID to check it is the result of a recent fork
# and it is reseting the MessageBuffer if that's the case.
class SingleThreadSender
def initialize(message_buffer)
def initialize(message_buffer, logger: nil)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
# store the pid for which this sender has been created
update_fork_pid
end

def add(message)
@message_buffer.add(message)
@mx.synchronize {
# we have just forked, meaning we have messages in the buffer that we should
# not send, they belong to the parent process, let's clear the buffer.
if forked?
@message_buffer.reset
@message_buffer.reset_telemetry
remeh marked this conversation as resolved.
Show resolved Hide resolved
update_fork_pid
end
@message_buffer.add(message)
}
end

def flush(*)
@message_buffer.flush()
@mx.synchronize {
@message_buffer.flush()
}
end

# Compatibility with `Sender`
Expand All @@ -26,6 +45,19 @@ def stop()
# Compatibility with `Sender`
def rendez_vous()
end

private

# below are "fork management" methods to be able to clean the MessageBuffer
# if it detects that it is running in a unknown PID.

def forked?
Process.pid != @fork_pid
end

def update_fork_pid
@fork_pid = Process.pid
end
end
end
end
4 changes: 2 additions & 2 deletions spec/statsd/forwarder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down Expand Up @@ -283,7 +283,7 @@
it 'builds the sender' do
expect(Datadog::Statsd::Sender)
.to receive(:new)
.with(message_buffer)
.with(message_buffer, logger: logger)
.exactly(1)

subject
Expand Down