-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[sender] reset buffers on forks and reset the companion thread if dead or nil #203
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few notes! One general note is, since our customers do read through these PRs, I suggest renaming the PR to something clearer, such as "Fix metrics reporting in applications using fork" or something similar.
I also recommend clarifying in what situation UDPSocket.new.tap
was causing the VM to crash, since this seems quite relevant to users on older versions that does use that code (should they upgrade ASAP to avoid this crash? Does it only get triggered along with other changes in this PR?).
One extra thing that occurred to me is that some of the changes to the Since the intention seems to be that one |
Yes entirely true, that's exactly what I'm doing right now by adding a mutex, mainly around the message queue. 👍 |
6f92771
to
9614c8c
Compare
9614c8c
to
72105bf
Compare
…es simultaneously
4c08e46
to
7460c87
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the extra "oh btw concurrency and lots of changes" notes. Feel free to separate changes out to a separate PR if you'd like to get the fork checking in first and separately tackle the concurrency.
lib/datadog/statsd/sender.rb
Outdated
@mx.synchronize { | ||
message_queue.close if CLOSEABLE_QUEUES | ||
@message_queue = nil | ||
message_buffer.reset | ||
start |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a race hiding here. If a process just forked, and started two new threads, and both threads try to report data, the following timeline can occur:
- Thread A checks line 52, sees that no sender thread is alive. Grabs the lock. (Scheduler switches threads)
- Thread B checks line 52, sees that no sender thread is alive. Tries to grab the lock, is taken, so it blocks waiting for lock (Scheduler switches threads)
- Thread A closes previous queue and resets it. Creates a new thread. Releases the lock. Adds message to queue. (Scheduler switches threads)
- Thread B wakes up, grabs the lock, closes the queue that Thread A created and resets it (data from Thread A) gets lost. Creates another sender thread. etc...
TL;DR After grabbing the lock, we need to check that the world state is as we expected it to be when we went in to grab the lock, as it might've moved in the meanwhile :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a process just forked, and started two new threads
I don't expect the first process to have to create a thread, but I agree it could happen if for an unknown reason the first process see its thread dying at the same moment. I'll look into this.
if CLOSEABLE_QUEUES | ||
def stop(join_worker: true) | ||
message_queue = @message_queue | ||
message_queue.close if message_queue | ||
@mx.synchronize { | ||
message_queue = @message_queue | ||
message_queue.close if message_queue | ||
|
||
sender_thread = @sender_thread | ||
sender_thread.join if sender_thread && join_worker | ||
sender_thread = @sender_thread | ||
sender_thread.join if sender_thread && join_worker | ||
} | ||
end | ||
else | ||
def stop(join_worker: true) | ||
message_queue = @message_queue | ||
message_queue << :close if message_queue | ||
@mx.synchronize { | ||
message_queue = @message_queue | ||
message_queue << :close if message_queue | ||
|
||
sender_thread = @sender_thread | ||
sender_thread.join if sender_thread && join_worker | ||
sender_thread = @sender_thread | ||
sender_thread.join if sender_thread && join_worker | ||
} | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: These two methods are very similar -- would it be worth unifying them and only doing the if CLOSEABLE_QUEUES
in the one line that changes between them?
@mx.synchronize { | ||
message_queue.push(:flush) | ||
rendez_vous if sync | ||
} | ||
end | ||
|
||
def rendez_vous |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since #rendez_vous
is public, should it also perform similar checks to #flush
and use the lock as well?
def add(message) | ||
# we have just forked, meaning we have messages in the buffer that we should | ||
# not send, they belong to the parent process, let's clear the buffer. | ||
if forked? | ||
@message_buffer.reset | ||
update_fork_pid | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A similar race to the one I described for the Sender
can happen here. Two threads call this method at the same time, both of them get forked?
=> true
and the @message_buffer
gets reset twice.
Also the interaction with the buffer itself may have issues with concurrency.
Addressed by #205 |
We want to reset the
MessageBuffer
if we appear to be in a recent fork, since the contained messages belong to the parent process.On top of that, when using the original multi-threaded Sender, this PR is re-spawning the companion thread if it seems to be dead for whatever reasons. It should automatically help when someone is using the multi-threaded mode in an app or framework using forks.
For some reasons, I had to remove the use of
UDPSocket.new.tap
since it was causing the interpreter to crash as soon as there was re-creation of a thread in fork childs.This is a follow-up of and is replacing #199.