Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEBUG-2334 Probe Notifier Worker component #4028

Merged
merged 13 commits into from
Oct 29, 2024
35 changes: 35 additions & 0 deletions lib/datadog/core/semaphore.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

module Datadog
module Core
# Semaphore pattern implementation, as described in documentation for
# ConditionVariable.
#
# @api private
class Semaphore
def initialize
@wake_lock = Mutex.new
@wake = ConditionVariable.new
end

def signal
wake_lock.synchronize do
wake.signal
end
end

def wait(timeout = nil)
wake_lock.synchronize do
# steep specifies that the second argument to wait is of type
# ::Time::_Timeout which for some reason is not Numeric and is not
# castable from Numeric.
wake.wait(wake_lock, timeout) # steep:ignore
end
end

private

attr_reader :wake_lock, :wake
end
end
end
244 changes: 244 additions & 0 deletions lib/datadog/di/probe_notifier_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
# frozen_string_literal: true

require_relative '../core/semaphore'

module Datadog
module DI
# Background worker thread for sending probe statuses and snapshots
# to the backend (via the agent).
#
# The loop inside the worker rescues all exceptions to prevent termination
# due to unhandled exceptions raised by any downstream code.
# This includes communication and protocol errors when sending the
# payloads to the agent.
#
# The worker groups the data to send into batches. The goal is to perform
# no more than one network operation per event type per second.
# There is also a limit on the length of the sending queue to prevent
# it from growing without bounds if upstream code generates an enormous
# number of events for some reason.
#
# Wake-up events are used (via ConditionVariable) to keep the thread
# asleep if there is no work to be done.
#
# @api private
class ProbeNotifierWorker
# Minimum interval between submissions.
# TODO make this into an internal setting and increase default to 2 or 3.
MIN_SEND_INTERVAL = 1
Comment on lines +26 to +28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT of adding scale here? Is it seconds, milliseconds or ..? Maybe we can name it MIN_SEND_INTERVAL_SEC?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that #4012 has not yet been looked at, and I have another 2000+ lines of code pending locally, I would like to only make changes in this and other open PRs that address clear problems. I am happy to discuss adding units to times and if there is team consensus on how the units should be indicated, add them in a subsequent PR.


def initialize(settings, transport, logger)
@settings = settings
@status_queue = []
@snapshot_queue = []
@transport = transport
@logger = logger
@lock = Mutex.new
@wake = Core::Semaphore.new
@io_in_progress = false
@sleep_remaining = nil
@wake_scheduled = false
@thread = nil
end

attr_reader :settings
attr_reader :logger

def start
return if @thread
@thread = Thread.new do
loop do
# TODO If stop is requested, we stop immediately without
# flushing the submissions. Should we send pending submissions
# and then quit?
break if @stop_requested

sleep_remaining = @lock.synchronize do
if sleep_remaining && sleep_remaining > 0
# Recalculate how much sleep time is remaining, then sleep that long.
set_sleep_remaining
else
0
end
Strech marked this conversation as resolved.
Show resolved Hide resolved
end

if sleep_remaining > 0
# Do not need to update @wake_scheduled here because
# wake-up is already scheduled for the earliest possible time.
wake.wait(sleep_remaining)
next
end
Strech marked this conversation as resolved.
Show resolved Hide resolved

begin
more = maybe_send
rescue => exc
raise if settings.dynamic_instrumentation.propagate_all_exceptions

logger.warn("Error in probe notifier worker: #{exc.class}: #{exc} (at #{exc.backtrace.first})")
end
@lock.synchronize do
@wake_scheduled = more
end
wake.wait(more ? MIN_SEND_INTERVAL : nil)
end
end
end

# Stops the background thread.
#
# Attempts a graceful stop with the specified timeout, then falls back
# to killing the thread using Thread#kill.
def stop(timeout = 1)
@stop_requested = true
wake.signal
if thread
unless thread.join(timeout)
thread.kill
end
@thread = nil
end
end

# Waits for background thread to send pending notifications.
#
# This method waits for the notification queue to become empty
# rather than for a particular set of notifications to be sent out,
# therefore, it should only be called when there is no parallel
# activity (in another thread) that causes more notifications
# to be generated.
def flush
loop do
if @thread.nil? || !@thread.alive?
return
end
Strech marked this conversation as resolved.
Show resolved Hide resolved

io_in_progress, queues_empty = @lock.synchronize do
[io_in_progress?, status_queue.empty? && snapshot_queue.empty?]
end

if io_in_progress
# If we just call Thread.pass we could be in a busy loop -
# add a sleep.
sleep 0.25
next
elsif queues_empty
break
else
sleep 0.25
next
Strech marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +119 to +128
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to avoid the sleeping by using a condition variable to flag when the queue is empty

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a note to investigate this.

end
end
end

private

attr_reader :transport
attr_reader :wake
attr_reader :thread

# This method should be called while @lock is held.
def io_in_progress?
@io_in_progress
end

attr_reader :last_sent

[
[:status, 'probe status'],
[:snapshot, 'snapshot'],
].each do |(event_type, event_name)|
Comment on lines +146 to +149
Copy link
Member

@Strech Strech Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[
[:status, 'probe status'],
[:snapshot, 'snapshot'],
].each do |(event_type, event_name)|
{
status: 'probe status',
snapshot: 'snapshot'
}.each do |event_type, event_name|

attr_reader "#{event_type}_queue"

# Adds a status or a snapshot to the queue to be sent to the agent
# at the next opportunity.
#
# If the queue is too large, the event will not be added.
#
# Signals the background thread to wake up (and do the sending)
# if it has been more than 1 second since the last send of the same
# event type.
define_method("add_#{event_type}") do |event|
@lock.synchronize do
queue = send("#{event_type}_queue")
# TODO determine a suitable limit via testing/benchmarking
if queue.length > 100
logger.warn("#{self.class.name}: dropping #{event_type} because queue is full")
else
queue << event
end
end

# Figure out whether to wake up the worker thread.
# If minimum send interval has elapsed since the last send,
# wake up immediately.
@lock.synchronize do
unless @wake_scheduled
@wake_scheduled = true
set_sleep_remaining
wake.signal
end
end
end

# Determine how much longer the worker thread should sleep
# so as not to send in less than MIN_SEND_INTERVAL since the last send.
# Important: this method must be called when @lock is held.
#
# Returns the time remaining to sleep.
def set_sleep_remaining
now = Core::Utils::Time.get_time
@sleep_remaining = if last_sent
[last_sent + MIN_SEND_INTERVAL - now, 0].max
else
0
end
end

public "add_#{event_type}"

# Sends pending probe statuses or snapshots.
#
# This method should ideally only be called when there are actually
# events to send, but it can be called when there is nothing to do.
# Currently we only have one wake-up signaling object and two
# types of events. Therefore on most wake-ups we expect to only
# send one type of events.
define_method("maybe_send_#{event_type}") do
batch = nil
@lock.synchronize do
batch = instance_variable_get("@#{event_type}_queue")
instance_variable_set("@#{event_type}_queue", [])
Comment on lines +209 to +210
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: It may be worth adding an attr_accessor and then use send(...) to access these variables.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reading side is done with an attribute, are you concerned that there maybe a spelling mistake here and the wrong variable will be written to?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly thinking that instance_variable_set and instance_variable_get are very sharp weapons so having the attr_accessor seems a bit easier to avoid bugs.

On the other hand, it is true that we can misspell the creation of the attr_accessor as well, so it's not like there's no potential for bugs there either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me leave it this way for now but if the matter comes up again I can redo as attr_accessor.

@io_in_progress = batch.any? # steep:ignore
end
if batch.any? # steep:ignore
begin
transport.public_send("send_#{event_type}", batch)
time = Core::Utils::Time.get_time
@lock.synchronize do
@last_sent = time
end
rescue => exc
raise if settings.dynamic_instrumentation.propagate_all_exceptions
logger.warn("failed to send #{event_name}: #{exc.class}: #{exc} (at #{exc.backtrace.first})")
end
end
batch.any? # steep:ignore
rescue ThreadError
# Normally the queue should only be consumed in this method,
# however if anyone consumes it elsewhere we don't want to block
# while consuming it here. Rescue ThreadError and return.
logger.warn("unexpected #{event_name} queue underflow - consumed elsewhere?")
ensure
@lock.synchronize do
@io_in_progress = false
end
end
end

def maybe_send
rv = maybe_send_status
rv || maybe_send_snapshot
end
end
end
end
21 changes: 21 additions & 0 deletions sig/datadog/core/semaphore.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module Datadog
module Core
class Semaphore
@wake_lock: ::Mutex

@wake: ::ConditionVariable

def initialize: () -> void

def signal: () -> void

def wait: (Numeric|nil timeout) -> void

private

attr_reader wake_lock: ::Mutex

attr_reader wake: ::ConditionVariable
end
end
end
61 changes: 61 additions & 0 deletions sig/datadog/di/probe_notifier_worker.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
module Datadog
module DI
class ProbeNotifierWorker

MIN_SEND_INTERVAL: 1

@settings: untyped

@status_queue: Array[Hash[String, untyped]]

@snapshot_queue: Array[Hash[String, untyped]]

@transport: Transport

@lock: Mutex

@wake: Core::Semaphore

@io_in_progress: bool

@thread: Thread

@stop_requested: bool

@logger: Core::Logger

def initialize: (untyped settings, Transport transport, Core::Logger logger) -> void

attr_reader settings: untyped

attr_reader logger: Core::Logger

def start: () -> void
def stop: (?::Integer timeout) -> void
def flush: () -> void

private

def last_sent: () -> Numeric

def set_sleep_remaining: () -> Numeric

def status_queue: () -> Array[Hash[String, untyped]]

def snapshot_queue: () -> Array[Hash[String, untyped]]

attr_reader transport: Transport

attr_reader wake: Core::Semaphore

attr_reader thread: Thread
def io_in_progress?: () -> bool

def maybe_send: () -> bool

def maybe_send_status: () -> bool

def maybe_send_snapshot: () -> bool
end
end
end
Loading
Loading