-
Notifications
You must be signed in to change notification settings - Fork 373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DEBUG-2334 Probe Notifier Worker component #4028
Conversation
This is a background thread that notification payloads (probe status and probe snapshots) can be submitted to. The payloads will be batched into groups if possible, and sent to the local agent asynchronously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to add some small code adjustments to reduce the volume of the methods. Great job 👏🏼
# Minimum interval between submissions. | ||
# TODO make this into an internal setting and increase default to 2 or 3. | ||
MIN_SEND_INTERVAL = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT of adding scale here? Is it seconds, milliseconds or ..? Maybe we can name it MIN_SEND_INTERVAL_SEC
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that #4012 has not yet been looked at, and I have another 2000+ lines of code pending locally, I would like to only make changes in this and other open PRs that address clear problems. I am happy to discuss adding units to times and if there is team consensus on how the units should be indicated, add them in a subsequent PR.
def initialize(settings, agent_settings, transport) | ||
@settings = settings | ||
@status_queue = [] | ||
@snapshot_queue = [] | ||
@transport = transport | ||
@lock = Mutex.new | ||
@wake = Core::Semaphore.new | ||
@io_in_progress = false | ||
@sleep_remaining = nil | ||
@wake_scheduled = false | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not see agent_settings
to be used, is it correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are now consumed by the transport, I removed agent settings from probe notifier worker.
begin | ||
more = maybe_send | ||
rescue => exc | ||
raise if settings.dynamic_instrumentation.propagate_all_exceptions | ||
|
||
warn "Error in probe notifier worker: #{exc.class}: #{exc} (at #{exc.backtrace.first})" | ||
end | ||
@lock.synchronize do | ||
@wake_scheduled = more | ||
end | ||
wake.wait(more ? MIN_SEND_INTERVAL : nil) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
begin | |
more = maybe_send | |
rescue => exc | |
raise if settings.dynamic_instrumentation.propagate_all_exceptions | |
warn "Error in probe notifier worker: #{exc.class}: #{exc} (at #{exc.backtrace.first})" | |
end | |
@lock.synchronize do | |
@wake_scheduled = more | |
end | |
wake.wait(more ? MIN_SEND_INTERVAL : nil) | |
end | |
begin | |
more = maybe_send | |
rescue => exc | |
raise if settings.dynamic_instrumentation.propagate_all_exceptions | |
warn "Error in probe notifier worker: #{exc.class}: #{exc} (at #{exc.backtrace.first})" | |
end | |
@lock.synchronize { @wake_scheduled = more } | |
wake.wait(more ? MIN_SEND_INTERVAL : nil) | |
end |
unless thread&.join(timeout) | ||
thread.kill | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unless thread&.join(timeout) | |
thread.kill | |
end | |
thread.kill unless thread&.join(timeout) |
[ | ||
[:status, 'probe status'], | ||
[:snapshot, 'snapshot'], | ||
].each do |(event_type, event_name)| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[ | |
[:status, 'probe status'], | |
[:snapshot, 'snapshot'], | |
].each do |(event_type, event_name)| | |
{ | |
status: 'probe status', | |
snapshot: 'snapshot' | |
}.each do |event_type, event_name| |
unless thread&.join(timeout) | ||
thread.kill | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👀 This will fail if thread
is nil
:
[3] pry(main)> thread = nil
=> nil
[4] pry(main)> unless thread&.join(123)
[4] pry(main)* thread.kill
[4] pry(main)* end
NoMethodError: undefined method `kill' for nil:NilClass
from (pry):7:in `__pry__
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, repaired.
if io_in_progress | ||
# If we just call Thread.pass we could be in a busy loop - | ||
# add a sleep. | ||
sleep 0.25 | ||
next | ||
elsif queues_empty | ||
break | ||
else | ||
sleep 0.25 | ||
next |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible to avoid the sleeping by using a condition variable to flag when the queue is empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a note to investigate this.
context 'when three snapshots are added in quick succession' do | ||
it 'sends two batches' do | ||
expect(worker.send(:snapshot_queue)).to be_empty | ||
|
||
expect(transport).to receive(:send_snapshot).once.with([snapshot]) | ||
|
||
worker.add_snapshot(snapshot) | ||
sleep 0.1 | ||
worker.add_snapshot(snapshot) | ||
sleep 0.1 | ||
worker.add_snapshot(snapshot) | ||
|
||
# Since sending is asynchronous, we need to relinquish execution | ||
# for the sending thread to run. | ||
sleep(0.1) | ||
|
||
# At this point the first snapshot should have been sent, | ||
# with the remaining two in the queue | ||
expect(worker.send(:snapshot_queue)).to eq([snapshot, snapshot]) | ||
|
||
sleep 0.4 | ||
# Still within the cooldown period | ||
expect(worker.send(:snapshot_queue)).to eq([snapshot, snapshot]) | ||
|
||
expect(transport).to receive(:send_snapshot).once.with([snapshot, snapshot]) | ||
|
||
sleep 0.5 | ||
expect(worker.send(:snapshot_queue)).to eq([]) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible, avoid using sleep
s in tests -- they make the test suite both slower and flakier >_>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a note to investigate this. The tests are currently not flaky and speeding them up is a lower priority than shipping DI to customers, but if they start being flaky I will revisit this sooner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that slow specs kinda affect all CI runs so yeah, try not to add too much to that fire >_>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #4028 +/- ##
==========================================
- Coverage 97.86% 97.84% -0.03%
==========================================
Files 1321 1324 +3
Lines 79326 79509 +183
Branches 3934 3959 +25
==========================================
+ Hits 77631 77794 +163
- Misses 1695 1715 +20 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put a few cents here, but I think it's ok
batch = instance_variable_get("@#{event_type}_queue") | ||
instance_variable_set("@#{event_type}_queue", []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: It may be worth adding an attr_accessor
and then use send(...)
to access these variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reading side is done with an attribute, are you concerned that there maybe a spelling mistake here and the wrong variable will be written to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was mostly thinking that instance_variable_set
and instance_variable_get
are very sharp weapons so having the attr_accessor seems a bit easier to avoid bugs.
On the other hand, it is true that we can misspell the creation of the attr_accessor as well, so it's not like there's no potential for bugs there either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me leave it this way for now but if the matter comes up again I can redo as attr_accessor.
* master: DEBUG-2334 Probe Notifier Worker component (DataDog#4028) DEBUG-2334 dynamic instrumentation probe notification builder (DataDog#4011) Handle low-level libddwaf exception in Context [NO-TICKET] Minor: Fix typos in safe_dup_spec.rb Remove libdatadog musl Remove ffi's after installation Remove cached gems
* DEBUG-2334 Probe Notifier Worker component This is a background thread that notification payloads (probe status and probe snapshots) can be submitted to. The payloads will be batched into groups if possible, and sent to the local agent asynchronously.
What does this PR do?
This is a background thread that notification payloads (probe
status and probe snapshots) can be submitted to.
The payloads will be batched into groups if possible, and
sent to the local agent asynchronously.
Motivation:
Initial DI implementation.
Change log entry
None
Additional Notes:
How to test the change?
Unit tests in this PR