-
-
Notifications
You must be signed in to change notification settings - Fork 31.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-109978: Allow multiprocessing finalizers to run on a separate thread #110510
base: main
Are you sure you want to change the base?
Conversation
pitrou
commented
Oct 7, 2023
•
edited by bedevere-app
bot
Loading
edited by bedevere-app
bot
- Issue: Allow multiprocessing finalizers to run on a separate thread #109978
a7d3581
to
3664a79
Compare
if thread_was_running: | ||
# HACK: os.fork() queries the system for the number of running threads. | ||
# However, Thread.join() only ensures that the Python thread state | ||
# was destroyed, while the system thread could still be running. | ||
# Give it time to exit. | ||
time.sleep(0.001) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit unfortunate. @gpshead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, this is effectively because our join() has never truely been a join... I filed #110829 to track that, it is probably time to fix that.
I think that it will skip my turn for this one. I touched too many multiprocessing code recently, and I was bitten. Maybe @serhiy-storchaka wants to have a look, he did something like that recently (I recall vaguely). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tried to understand the whole thing yet, but i've given it a once over to look at the issues this is running into.
if thread_was_running: | ||
# HACK: os.fork() queries the system for the number of running threads. | ||
# However, Thread.join() only ensures that the Python thread state | ||
# was destroyed, while the system thread could still be running. | ||
# Give it time to exit. | ||
time.sleep(0.001) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, this is effectively because our join() has never truely been a join... I filed #110829 to track that, it is probably time to fix that.
@@ -7625,6 +7625,9 @@ static void warn_about_fork_with_threads(const char* name) { | |||
num_python_threads = atoi(field); // 0 on error | |||
} | |||
} | |||
// XXX This counts the number of system threads, but Python code can only | |||
// call threading.Thread.join(), which can return before the system thread ended. | |||
// This function could therefore print a spurious warning in unlucky cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not spurious though, the existence of a thread doing anything is accurately identified as a potential problem. The annoyance that led to this though is that Python didn't provide a concrete way to guarantee a thread has exited.
while self._pool: | ||
self._pool.pop().join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference?
with self._lock(): | ||
return self._stopped |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the lock needed for reading an attribute?
|
||
def wait_until_idle(self): | ||
with self._queue_drained_cond: | ||
self._queue_drained_cond.wait_for(lambda: self._queue.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._queue_drained_cond.wait_for(lambda: self._queue.empty()) | |
self._queue_drained_cond.wait_for(self._queue.empty) |
# from the work loop. | ||
assert callable(cb) | ||
with self._lock: | ||
if not self._stopped and self._thread is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't self._thread
always None when self._stopped
is True?
finally: | ||
cb = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a reference to _work_loop()
or copy the comment? I was confused when I saw this.
Hmm, _work_queue.enqueue_task()
is always called from the global
enqueue_task()which keeps a reference to
cb`, so it perhaps does not help.
sub_debug('finalizer ignored because different process') | ||
res = None | ||
else: | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already one try
a level above. The second try
may be not needed, you can simply add finally
at the same level after else
. The difference is that the finally block will be exececuted in the except
case, but I do not see what can be wrong with this.
FWIW, I'm prioritizing #110848 now, as it should make this PR slightly more robust. |