Skip to content

Commit

Permalink
pythongh-84570: Send-Wait Fixes for _xxinterpchannels (pythongh-111006)
Browse files Browse the repository at this point in the history
There were a few things I did in pythongh-110565 that need to be fixed. I also forgot to add tests in that PR.

(Note that this PR exposes a refleak introduced by pythongh-110246. I'll take care of that separately.)
  • Loading branch information
ericsnowcurrently authored and Glyphack committed Jan 27, 2024
1 parent e508630 commit 934d648
Show file tree
Hide file tree
Showing 5 changed files with 571 additions and 148 deletions.
15 changes: 15 additions & 0 deletions Include/internal/pycore_pythread.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ extern int _PyThread_at_fork_reinit(PyThread_type_lock *lock);
#endif /* HAVE_FORK */


// unset: -1 seconds, in nanoseconds
#define PyThread_UNSET_TIMEOUT ((_PyTime_t)(-1 * 1000 * 1000 * 1000))

/* Helper to acquire an interruptible lock with a timeout. If the lock acquire
* is interrupted, signal handlers are run, and if they raise an exception,
* PY_LOCK_INTR is returned. Otherwise, PY_LOCK_ACQUIRED or PY_LOCK_FAILURE
* are returned, depending on whether the lock can be acquired within the
* timeout.
*/
// Exported for the _xxinterpchannels module.
PyAPI_FUNC(PyLockStatus) PyThread_acquire_lock_timed_with_retries(
PyThread_type_lock,
PY_TIMEOUT_T microseconds);


#ifdef __cplusplus
}
#endif
Expand Down
217 changes: 173 additions & 44 deletions Lib/test/test__xxinterpchannels.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,62 @@ def test_channel_list_interpreters_closed_send_end(self):
with self.assertRaises(channels.ChannelClosedError):
channels.list_interpreters(cid, send=False)

####################
def test_allowed_types(self):
cid = channels.create()
objects = [
None,
'spam',
b'spam',
42,
]
for obj in objects:
with self.subTest(obj):
channels.send(cid, obj, blocking=False)
got = channels.recv(cid)

self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
# XXX Check the following?
#self.assertIsNot(got, obj)
# XXX What about between interpreters?

def test_run_string_arg_unresolved(self):
cid = channels.create()
interp = interpreters.create()

out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(cid.end)
_channels.send(cid, b'spam', blocking=False)
"""),
dict(cid=cid.send))
obj = channels.recv(cid)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')

# XXX For now there is no high-level channel into which the
# sent channel ID can be converted...
# Note: this test caused crashes on some buildbots (bpo-33615).
@unittest.skip('disabled until high-level channels exist')
def test_run_string_arg_resolved(self):
cid = channels.create()
cid = channels._channel_id(cid, _resolve=True)
interp = interpreters.create()

out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(chan.id.end)
_channels.send(chan.id, b'spam', blocking=False)
"""),
dict(chan=cid.send))
obj = channels.recv(cid)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')

#-------------------
# send/recv

def test_send_recv_main(self):
cid = channels.create()
Expand Down Expand Up @@ -705,6 +760,9 @@ def test_recv_sending_interp_destroyed(self):
channels.recv(cid2)
del cid2

#-------------------
# send_buffer

def test_send_buffer(self):
buf = bytearray(b'spamspamspam')
cid = channels.create()
Expand All @@ -720,60 +778,131 @@ def test_send_buffer(self):
obj[4:8] = b'ham.'
self.assertEqual(obj, buf)

def test_allowed_types(self):
#-------------------
# send with waiting

def build_send_waiter(self, obj, *, buffer=False):
# We want a long enough sleep that send() actually has to wait.

if buffer:
send = channels.send_buffer
else:
send = channels.send

cid = channels.create()
objects = [
None,
'spam',
b'spam',
42,
]
for obj in objects:
with self.subTest(obj):
channels.send(cid, obj, blocking=False)
got = channels.recv(cid)
try:
started = time.monotonic()
send(cid, obj, blocking=False)
stopped = time.monotonic()
channels.recv(cid)
finally:
channels.destroy(cid)
delay = stopped - started # seconds
delay *= 3

self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
# XXX Check the following?
#self.assertIsNot(got, obj)
# XXX What about between interpreters?
def wait():
time.sleep(delay)
return wait

def test_run_string_arg_unresolved(self):
def test_send_blocking_waiting(self):
received = None
obj = b'spam'
wait = self.build_send_waiter(obj)
cid = channels.create()
interp = interpreters.create()
def f():
nonlocal received
wait()
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send(cid, obj, blocking=True)
t.join()

out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(cid.end)
_channels.send(cid, b'spam', blocking=False)
"""),
dict(cid=cid.send))
obj = channels.recv(cid)
self.assertEqual(received, obj)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
def test_send_buffer_blocking_waiting(self):
received = None
obj = bytearray(b'spam')
wait = self.build_send_waiter(obj, buffer=True)
cid = channels.create()
def f():
nonlocal received
wait()
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send_buffer(cid, obj, blocking=True)
t.join()

# XXX For now there is no high-level channel into which the
# sent channel ID can be converted...
# Note: this test caused crashes on some buildbots (bpo-33615).
@unittest.skip('disabled until high-level channels exist')
def test_run_string_arg_resolved(self):
self.assertEqual(received, obj)

def test_send_blocking_no_wait(self):
received = None
obj = b'spam'
cid = channels.create()
cid = channels._channel_id(cid, _resolve=True)
interp = interpreters.create()
def f():
nonlocal received
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send(cid, obj, blocking=True)
t.join()

out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(chan.id.end)
_channels.send(chan.id, b'spam', blocking=False)
"""),
dict(chan=cid.send))
obj = channels.recv(cid)
self.assertEqual(received, obj)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
def test_send_buffer_blocking_no_wait(self):
received = None
obj = bytearray(b'spam')
cid = channels.create()
def f():
nonlocal received
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send_buffer(cid, obj, blocking=True)
t.join()

self.assertEqual(received, obj)

def test_send_closed_while_waiting(self):
obj = b'spam'
wait = self.build_send_waiter(obj)
cid = channels.create()
def f():
wait()
channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
with self.assertRaises(channels.ChannelClosedError):
channels.send(cid, obj, blocking=True)
t.join()

def test_send_buffer_closed_while_waiting(self):
try:
self._has_run_once
except AttributeError:
# At the moment, this test leaks a few references.
# It looks like the leak originates with the addition
# of _channels.send_buffer() (gh-110246), whereas the
# tests were added afterward. We want this test even
# if the refleak isn't fixed yet, so we skip here.
raise unittest.SkipTest('temporarily skipped due to refleaks')
else:
self._has_run_once = True

obj = bytearray(b'spam')
wait = self.build_send_waiter(obj, buffer=True)
cid = channels.create()
def f():
wait()
channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
with self.assertRaises(channels.ChannelClosedError):
channels.send_buffer(cid, obj, blocking=True)
t.join()

#-------------------
# close

def test_close_single_user(self):
Expand Down
52 changes: 2 additions & 50 deletions Modules/_threadmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
/* Interface to Sjoerd's portable C thread library */

#include "Python.h"
#include "pycore_ceval.h" // _PyEval_MakePendingCalls()
#include "pycore_dict.h" // _PyDict_Pop()
#include "pycore_interp.h" // _PyInterpreterState.threads.count
#include "pycore_moduleobject.h" // _PyModule_GetState()
Expand Down Expand Up @@ -76,57 +75,10 @@ lock_dealloc(lockobject *self)
Py_DECREF(tp);
}

/* Helper to acquire an interruptible lock with a timeout. If the lock acquire
* is interrupted, signal handlers are run, and if they raise an exception,
* PY_LOCK_INTR is returned. Otherwise, PY_LOCK_ACQUIRED or PY_LOCK_FAILURE
* are returned, depending on whether the lock can be acquired within the
* timeout.
*/
static PyLockStatus
static inline PyLockStatus
acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
{
PyThreadState *tstate = _PyThreadState_GET();
_PyTime_t endtime = 0;
if (timeout > 0) {
endtime = _PyDeadline_Init(timeout);
}

PyLockStatus r;
do {
_PyTime_t microseconds;
microseconds = _PyTime_AsMicroseconds(timeout, _PyTime_ROUND_CEILING);

/* first a simple non-blocking try without releasing the GIL */
r = PyThread_acquire_lock_timed(lock, 0, 0);
if (r == PY_LOCK_FAILURE && microseconds != 0) {
Py_BEGIN_ALLOW_THREADS
r = PyThread_acquire_lock_timed(lock, microseconds, 1);
Py_END_ALLOW_THREADS
}

if (r == PY_LOCK_INTR) {
/* Run signal handlers if we were interrupted. Propagate
* exceptions from signal handlers, such as KeyboardInterrupt, by
* passing up PY_LOCK_INTR. */
if (_PyEval_MakePendingCalls(tstate) < 0) {
return PY_LOCK_INTR;
}

/* If we're using a timeout, recompute the timeout after processing
* signals, since those can take time. */
if (timeout > 0) {
timeout = _PyDeadline_Get(endtime);

/* Check for negative values, since those mean block forever.
*/
if (timeout < 0) {
r = PY_LOCK_FAILURE;
}
}
}
} while (r == PY_LOCK_INTR); /* Retry if we were interrupted. */

return r;
return PyThread_acquire_lock_timed_with_retries(lock, timeout);
}

static int
Expand Down
Loading

0 comments on commit 934d648

Please sign in to comment.