Skip to content

Commit

Permalink
Set timeout for closing endpoints in Python (#136)
Browse files Browse the repository at this point in the history
#123 introduced timeouts to the generic callbacks, preventing failure to acquire lock due to GIL competition. However, those were not exposed to Python and at least one of the reasons it still timeouts is because of that, notice how the default `period=0` (never unblock) is used:

```cpp
Thread 1 (Thread 0x7f36d675f740 (LWP 155586) "pytest"):
#0  futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x7fff45058e58) at ../sysdeps/nptl/futex-internal.h:183
#1  __pthread_cond_wait_common (abstime=0x0, clockid=0, mutex=0x7fff45058e08, cond=0x7fff45058e30) at pthread_cond_wait.c:508
#2  __pthread_cond_wait (cond=0x7fff45058e30, mutex=0x7fff45058e08) at pthread_cond_wait.c:647
#3  0x00007f36d43634d4 in std::condition_variable::wait<ucxx::utils::CallbackNotifier::wait(uint64_t)::<lambda()> > (__p=..., __lock=..., this=0x7fff45058e30) at /opt/conda/envs/test/x86_64-conda-linux-gnu/include/c++/11.4.0/condition_variable:103
#4  ucxx::utils::CallbackNotifier::wait (this=this@entry=0x7fff45058e00, period=period@entry=0) at /datasets/pentschev/src/ucxx-deadlock/cpp/src/utils/callback_notifier.cpp:66
#5  0x00007f36d43470e1 in ucxx::Endpoint::close (this=0x7f369c701a90, period=0, maxAttempts=1) at /datasets/pentschev/src/ucxx-deadlock/cpp/src/endpoint.cpp:171
#6  0x00007f36d4753381 in __pyx_pw_4ucxx_4_lib_7libucxx_11UCXEndpoint_9close(_object*, _object* const*, long, _object*) () from /opt/conda/envs/test/lib/python3.10/site-packages/ucxx/_lib/libucxx.cpython-310-x86_64-linux-gnu.so
```

This PR exposes those arguments to Python and specify a default for Python async API `Endpoint.abort()` to prevent such deadlocks from occurring.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #136
  • Loading branch information
pentschev authored Nov 28, 2023
1 parent cca903f commit 21abf71
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 12 deletions.
15 changes: 11 additions & 4 deletions python/ucxx/_lib/libucxx.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,15 @@ cdef class UCXWorker():
with nogil:
self._worker.get().stopProgressThread()

def cancel_inflight_requests(self):
def cancel_inflight_requests(self, period=0, max_attempts=1):
cdef uint64_t c_period = period
cdef uint64_t c_max_attempts = max_attempts
cdef size_t num_canceled

with nogil:
num_canceled = self._worker.get().cancelInflightRequests()
num_canceled = self._worker.get().cancelInflightRequests(
c_period, c_max_attempts
)

return num_canceled

Expand Down Expand Up @@ -1044,9 +1048,12 @@ cdef class UCXEndpoint():

return int(<uintptr_t>worker)

def close(self):
def close(self, period=0, max_attempts=1):
cdef uint64_t c_period = period
cdef uint64_t c_max_attempts = max_attempts

with nogil:
self._endpoint.get().close()
self._endpoint.get().close(c_period, c_max_attempts)

def am_probe(self):
cdef ucp_ep_h handle
Expand Down
6 changes: 4 additions & 2 deletions python/ucxx/_lib/ucxx_api.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ cdef extern from "<ucxx/api.h>" namespace "ucxx" nogil:
bint pollingMode, int epoll_timeout
) except +raise_py_error
void stopProgressThread() except +raise_py_error
size_t cancelInflightRequests() except +raise_py_error
size_t cancelInflightRequests(
uint64_t period, uint64_t maxAttempts
) except +raise_py_error
bint tagProbe(const ucp_tag_t) const
void setProgressThreadStartCallback(
function[void(void*)] callback, void* callbackArg
Expand All @@ -256,7 +258,7 @@ cdef extern from "<ucxx/api.h>" namespace "ucxx" nogil:

cdef cppclass Endpoint(Component):
ucp_ep_h getHandle()
void close()
void close(uint64_t period, uint64_t maxAttempts)
shared_ptr[Request] amSend(
void* buffer, size_t length, ucs_memory_type_t memory_type, bint enable_python_future
) except +raise_py_error
Expand Down
44 changes: 38 additions & 6 deletions python/ucxx/_lib_async/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,58 @@ def closed(self):
"""Is this endpoint closed?"""
return self._ep is None or not self._ep.is_alive()

def abort(self):
def abort(self, period=10**10, max_attempts=1):
"""Close the communication immediately and abruptly.
Useful in destructors or generators' ``finally`` blocks.
Despite the attempt to close communication immediately, in some
circumstances, notably when the parent worker is running a progress
thread, a maximum timeout may be specified for which the close operation
will wait. This can be particularly important for cases where the progress
thread might be attempting to acquire the GIL while the current
thread owns that resource.
Notice, this functions doesn't signal the connected peer to close.
To do that, use `Endpoint.close()`
To do that, use `Endpoint.close()`.
Parameters
----------
period: int
maximum period to wait (in ns) for internal endpoint operations
to complete, usually two operations (pre and post) are involved
thus the maximum perceived timeout should be multiplied by two.
max_attempts: int
maximum number of attempts to close endpoint, only applicable
if worker is running a progress thread and `period > 0`.
"""
if self._ep is not None:
logger.debug("Endpoint.abort(): 0x%x" % self.uid)
self._ep.close()
# Wait for a maximum of `period` ns
self._ep.close(period=period, max_attempts=max_attempts)
self._ep = None
self._ctx = None

async def close(self):
async def close(self, period=10**10, max_attempts=1):
"""Close the endpoint cleanly.
This will attempt to flush outgoing buffers before actually
closing the underlying UCX endpoint.
A maximum timeout and number of attempts may be specified to prevent the
underlying `Endpoint` object from failing to acquire the GIL, see `abort()`
for details.
Parameters
----------
period: int
maximum period to wait (in ns) for internal endpoint operations
to complete, usually two operations (pre and post) are involved
thus the maximum perceived timeout should be multiplied by two.
max_attempts: int
maximum number of attempts to close endpoint, only applicable
if worker is running a progress thread and `period > 0`.
"""
if self.closed():
self.abort()
self.abort(period=period, max_attempts=max_attempts)
return
try:
# Making sure we only tell peer to shutdown once
Expand All @@ -84,7 +116,7 @@ async def close(self):
if not self._ctx.progress_mode.startswith("thread"):
self._ctx.worker.progress()
await asyncio.sleep(0)
self.abort()
self.abort(period=period, max_attempts=max_attempts)

async def am_send(self, buffer):
"""Send `buffer` to connected peer via active messages.
Expand Down

0 comments on commit 21abf71

Please sign in to comment.