Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-113884: Make queue.SimpleQueue thread-safe in --disable-gil builds #114161

Merged
merged 10 commits into from
Jan 23, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make :class:`queue.SimpleQueue` thread safe when the GIL is disabled.
202 changes: 115 additions & 87 deletions Modules/_queuemodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
#endif

#include "Python.h"
#include "pycore_ceval.h" // _PyEval_MakePendingCalls()
#include "pycore_ceval.h" // Py_MakePendingCalls()
#include "pycore_moduleobject.h" // _PyModule_GetState()
#include "pycore_parking_lot.h"
#include "pycore_time.h" // _PyTime_t

#include <stdbool.h>
Expand Down Expand Up @@ -151,7 +152,9 @@ RingBuf_Get(RingBuf *buf)
return item;
}

// Returns 0 on success or -1 if the buffer failed to grow
// Returns 0 on success or -1 if the buffer failed to grow.
//
// Steals a reference to item.
static int
RingBuf_Put(RingBuf *buf, PyObject *item)
{
Expand All @@ -164,7 +167,7 @@ RingBuf_Put(RingBuf *buf, PyObject *item)
return -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to decref item here isn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so

}
}
buf->items[buf->put_idx] = Py_NewRef(item);
buf->items[buf->put_idx] = item;
buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
buf->num_items++;
return 0;
Expand All @@ -184,9 +187,13 @@ RingBuf_IsEmpty(RingBuf *buf)

typedef struct {
PyObject_HEAD
PyThread_type_lock lock;
int locked;

// Are there threads waiting for items
bool has_threads_waiting;

// Items in the queue
RingBuf buf;

PyObject *weakreflist;
} simplequeueobject;

Expand All @@ -209,12 +216,6 @@ simplequeue_dealloc(simplequeueobject *self)
PyTypeObject *tp = Py_TYPE(self);

PyObject_GC_UnTrack(self);
if (self->lock != NULL) {
/* Unlock the lock so it's safe to free it */
if (self->locked > 0)
PyThread_release_lock(self->lock);
PyThread_free_lock(self->lock);
}
(void)simplequeue_clear(self);
if (self->weakreflist != NULL)
PyObject_ClearWeakRefs((PyObject *) self);
Expand Down Expand Up @@ -249,12 +250,6 @@ simplequeue_new_impl(PyTypeObject *type)
self = (simplequeueobject *) type->tp_alloc(type, 0);
if (self != NULL) {
self->weakreflist = NULL;
self->lock = PyThread_allocate_lock();
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
return NULL;
}
if (RingBuf_Init(&self->buf) < 0) {
Py_DECREF(self);
return NULL;
Expand All @@ -264,7 +259,29 @@ simplequeue_new_impl(PyTypeObject *type)
return (PyObject *) self;
}

typedef struct {
bool handed_off;
simplequeueobject *queue;
PyObject *item;
} HandoffData;

static void
maybe_handoff_item(HandoffData *data, PyObject **item, int has_more_waiters)
{
if (item == NULL) {
// No threads were waiting
data->handed_off = false;
}
else {
// There was at least one waiting thread, hand off the item
*item = data->item;
data->handed_off = true;
}
data->queue->has_threads_waiting = has_more_waiters;
}

/*[clinic input]
@critical_section
_queue.SimpleQueue.put
item: object
block: bool = True
Expand All @@ -280,21 +297,28 @@ never blocks. They are provided for compatibility with the Queue class.
static PyObject *
_queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
int block, PyObject *timeout)
/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
/*[clinic end generated code: output=4333136e88f90d8b input=a16dbb33363c0fa8]*/
{
/* BEGIN GIL-protected critical section */
if (RingBuf_Put(&self->buf, item) < 0)
return NULL;
if (self->locked) {
/* A get() may be waiting, wake it up */
self->locked = 0;
PyThread_release_lock(self->lock);
HandoffData data = {
.handed_off = 0,
.item = Py_NewRef(item),
.queue = self,
};
if (self->has_threads_waiting) {
// Try to hand the item off directly if there are threads waiting
_PyParkingLot_Unpark(&self->has_threads_waiting,
(_Py_unpark_fn_t *)maybe_handoff_item, &data);
}
if (!data.handed_off) {
if (RingBuf_Put(&self->buf, item) < 0) {
return NULL;
}
}
/* END GIL-protected critical section */
Py_RETURN_NONE;
}

/*[clinic input]
@critical_section
_queue.SimpleQueue.put_nowait
item: object

Expand All @@ -307,12 +331,23 @@ for compatibility with the Queue class.

static PyObject *
_queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
/*[clinic end generated code: output=0990536715efb1f1 input=36b1ea96756b2ece]*/
/*[clinic end generated code: output=0990536715efb1f1 input=ce949cc2cd8a4119]*/
{
return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
}

static PyObject *
empty_error(PyTypeObject *cls)
{
PyObject *module = PyType_GetModule(cls);
assert(module != NULL);
simplequeue_state *state = simplequeue_get_state(module);
PyErr_SetNone(state->EmptyError);
return NULL;
}

/*[clinic input]
@critical_section
_queue.SimpleQueue.get

cls: defining_class
Expand All @@ -335,23 +370,15 @@ in that case).
static PyObject *
_queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
int block, PyObject *timeout_obj)
/*[clinic end generated code: output=5c2cca914cd1e55b input=5b4047bfbc645ec1]*/
/*[clinic end generated code: output=5c2cca914cd1e55b input=f7836c65e5839c51]*/
{
_PyTime_t endtime = 0;
_PyTime_t timeout;
PyObject *item;
PyLockStatus r;
PY_TIMEOUT_T microseconds;
PyThreadState *tstate = PyThreadState_Get();

// XXX Use PyThread_ParseTimeoutArg().

if (block == 0) {
/* Non-blocking */
microseconds = 0;
}
else if (timeout_obj != Py_None) {
if (block != 0 && !Py_IsNone(timeout_obj)) {
/* With timeout */
_PyTime_t timeout;
if (_PyTime_FromSecondsObject(&timeout,
timeout_obj, _PyTime_ROUND_CEILING) < 0) {
return NULL;
Expand All @@ -361,65 +388,64 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
"'timeout' must be a non-negative number");
return NULL;
}
microseconds = _PyTime_AsMicroseconds(timeout,
_PyTime_ROUND_CEILING);
if (microseconds > PY_TIMEOUT_MAX) {
PyErr_SetString(PyExc_OverflowError,
"timeout value is too large");
return NULL;
}
endtime = _PyDeadline_Init(timeout);
}
else {
/* Infinitely blocking */
microseconds = -1;
}

/* put() signals the queue to be non-empty by releasing the lock.
* So we simply try to acquire the lock in a loop, until the condition
* (queue non-empty) becomes true.
*/
while (RingBuf_IsEmpty(&self->buf)) {
/* First a simple non-blocking try without releasing the GIL */
r = PyThread_acquire_lock_timed(self->lock, 0, 0);
if (r == PY_LOCK_FAILURE && microseconds != 0) {
Py_BEGIN_ALLOW_THREADS
r = PyThread_acquire_lock_timed(self->lock, microseconds, 1);
Py_END_ALLOW_THREADS
for (;;) {
if (!RingBuf_IsEmpty(&self->buf)) {
return RingBuf_Get(&self->buf);
}

if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) {
return NULL;
}
if (r == PY_LOCK_FAILURE) {
PyObject *module = PyType_GetModule(cls);
simplequeue_state *state = simplequeue_get_state(module);
/* Timed out */
PyErr_SetNone(state->EmptyError);
return NULL;
if (!block) {
return empty_error(cls);
}
self->locked = 1;

/* Adjust timeout for next iteration (if any) */
if (microseconds > 0) {
timeout = _PyDeadline_Get(endtime);
microseconds = _PyTime_AsMicroseconds(timeout,
_PyTime_ROUND_CEILING);
int64_t timeout_ns = -1;
if (endtime != 0) {
timeout_ns = _PyDeadline_Get(endtime);
if (timeout_ns < 0) {
return empty_error(cls);
}
}
}

/* BEGIN GIL-protected critical section */
item = RingBuf_Get(&self->buf);
if (self->locked) {
PyThread_release_lock(self->lock);
self->locked = 0;
bool waiting = 1;
self->has_threads_waiting = waiting;

PyObject *item = NULL;
int st = _PyParkingLot_Park(&self->has_threads_waiting, &waiting,
sizeof(bool), timeout_ns, &item,
/* detach */ 1);
switch (st) {
case Py_PARK_OK: {
assert(item != NULL);
return item;
}
case Py_PARK_TIMEOUT: {
return empty_error(cls);
}
case Py_PARK_INTR: {
// Interrupted
if (Py_MakePendingCalls() < 0) {
return NULL;
}
break;
}
case Py_PARK_AGAIN: {
// This should be impossible with the current implementation of
// PyParkingLot, but would be possible if critical sections /
// the GIL were released before the thread was added to the
// internal thread queue in the parking lot.
break;
}
default: {
Py_UNREACHABLE();
}
}
}
/* END GIL-protected critical section */

return item;
}

/*[clinic input]
@critical_section
_queue.SimpleQueue.get_nowait

cls: defining_class
Expand All @@ -434,33 +460,35 @@ raise the Empty exception.
static PyObject *
_queue_SimpleQueue_get_nowait_impl(simplequeueobject *self,
PyTypeObject *cls)
/*[clinic end generated code: output=620c58e2750f8b8a input=842f732bf04216d3]*/
/*[clinic end generated code: output=620c58e2750f8b8a input=d48be63633fefae9]*/
{
return _queue_SimpleQueue_get_impl(self, cls, 0, Py_None);
}

/*[clinic input]
@critical_section
_queue.SimpleQueue.empty -> bool

Return True if the queue is empty, False otherwise (not reliable!).
[clinic start generated code]*/

static int
_queue_SimpleQueue_empty_impl(simplequeueobject *self)
/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
/*[clinic end generated code: output=1a02a1b87c0ef838 input=96cb22df5a67d831]*/
{
return RingBuf_IsEmpty(&self->buf);
}

/*[clinic input]
@critical_section
_queue.SimpleQueue.qsize -> Py_ssize_t

Return the approximate size of the queue (not reliable!).
[clinic start generated code]*/

static Py_ssize_t
_queue_SimpleQueue_qsize_impl(simplequeueobject *self)
/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
/*[clinic end generated code: output=f9dcd9d0a90e121e input=e218623cb8c16a79]*/
{
return RingBuf_Len(&self->buf);
}
Expand Down
Loading