Skip to content

Commit fd8141d

Browse files
mpageaisk
authored andcommitted
pythongh-113884: Refactor queue.SimpleQueue to use a ring buffer to store items (python#114259)
Use a ring buffer instead of a Python list in order to simplify the process of making queue.SimpleQueue thread-safe in free-threaded builds. The ring buffer implementation has no places where critical sections may be released.
1 parent a61d920 commit fd8141d

File tree

1 file changed

+169
-40
lines changed

1 file changed

+169
-40
lines changed

Modules/_queuemodule.c

+169-40
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "pycore_moduleobject.h" // _PyModule_GetState()
88
#include "pycore_time.h" // _PyTime_t
99

10+
#include <stdbool.h>
1011
#include <stddef.h> // offsetof()
1112

1213
typedef struct {
@@ -25,12 +26,167 @@ static struct PyModuleDef queuemodule;
2526
#define simplequeue_get_state_by_type(type) \
2627
(simplequeue_get_state(PyType_GetModuleByDef(type, &queuemodule)))
2728

29+
static const Py_ssize_t INITIAL_RING_BUF_CAPACITY = 8;
30+
31+
typedef struct {
32+
// Where to place the next item
33+
Py_ssize_t put_idx;
34+
35+
// Where to get the next item
36+
Py_ssize_t get_idx;
37+
38+
PyObject **items;
39+
40+
// Total number of items that may be stored
41+
Py_ssize_t items_cap;
42+
43+
// Number of items stored
44+
Py_ssize_t num_items;
45+
} RingBuf;
46+
47+
static int
48+
RingBuf_Init(RingBuf *buf)
49+
{
50+
buf->put_idx = 0;
51+
buf->get_idx = 0;
52+
buf->items_cap = INITIAL_RING_BUF_CAPACITY;
53+
buf->num_items = 0;
54+
buf->items = PyMem_Calloc(buf->items_cap, sizeof(PyObject *));
55+
if (buf->items == NULL) {
56+
PyErr_NoMemory();
57+
return -1;
58+
}
59+
return 0;
60+
}
61+
62+
static PyObject *
63+
RingBuf_At(RingBuf *buf, Py_ssize_t idx)
64+
{
65+
assert(idx >= 0 && idx < buf->num_items);
66+
return buf->items[(buf->get_idx + idx) % buf->items_cap];
67+
}
68+
69+
static void
70+
RingBuf_Fini(RingBuf *buf)
71+
{
72+
PyObject **items = buf->items;
73+
Py_ssize_t num_items = buf->num_items;
74+
Py_ssize_t cap = buf->items_cap;
75+
Py_ssize_t idx = buf->get_idx;
76+
buf->items = NULL;
77+
buf->put_idx = 0;
78+
buf->get_idx = 0;
79+
buf->num_items = 0;
80+
buf->items_cap = 0;
81+
for (Py_ssize_t n = num_items; n > 0; idx = (idx + 1) % cap, n--) {
82+
Py_DECREF(items[idx]);
83+
}
84+
PyMem_Free(items);
85+
}
86+
87+
// Resize the underlying items array of buf to the new capacity and arrange
88+
// the items contiguously in the new items array.
89+
//
90+
// Returns -1 on allocation failure or 0 on success.
91+
static int
92+
resize_ringbuf(RingBuf *buf, Py_ssize_t capacity)
93+
{
94+
Py_ssize_t new_capacity = Py_MAX(INITIAL_RING_BUF_CAPACITY, capacity);
95+
if (new_capacity == buf->items_cap) {
96+
return 0;
97+
}
98+
assert(buf->num_items <= new_capacity);
99+
100+
PyObject **new_items = PyMem_Calloc(new_capacity, sizeof(PyObject *));
101+
if (new_items == NULL) {
102+
return -1;
103+
}
104+
105+
// Copy the "tail" of the old items array. This corresponds to "head" of
106+
// the abstract ring buffer.
107+
Py_ssize_t tail_size =
108+
Py_MIN(buf->num_items, buf->items_cap - buf->get_idx);
109+
if (tail_size > 0) {
110+
memcpy(new_items, buf->items + buf->get_idx,
111+
tail_size * sizeof(PyObject *));
112+
}
113+
114+
// Copy the "head" of the old items array, if any. This corresponds to the
115+
// "tail" of the abstract ring buffer.
116+
Py_ssize_t head_size = buf->num_items - tail_size;
117+
if (head_size > 0) {
118+
memcpy(new_items + tail_size, buf->items,
119+
head_size * sizeof(PyObject *));
120+
}
121+
122+
PyMem_Free(buf->items);
123+
buf->items = new_items;
124+
buf->items_cap = new_capacity;
125+
buf->get_idx = 0;
126+
buf->put_idx = buf->num_items;
127+
128+
return 0;
129+
}
130+
131+
// Returns a strong reference from the head of the buffer.
132+
static PyObject *
133+
RingBuf_Get(RingBuf *buf)
134+
{
135+
assert(buf->num_items > 0);
136+
137+
if (buf->num_items < (buf->items_cap / 4)) {
138+
// Items is less than 25% occupied, shrink it by 50%. This allows for
139+
// growth without immediately needing to resize the underlying items
140+
// array.
141+
//
142+
// It's safe it ignore allocation failures here; shrinking is an
143+
// optimization that isn't required for correctness.
144+
(void)resize_ringbuf(buf, buf->items_cap / 2);
145+
}
146+
147+
PyObject *item = buf->items[buf->get_idx];
148+
buf->items[buf->get_idx] = NULL;
149+
buf->get_idx = (buf->get_idx + 1) % buf->items_cap;
150+
buf->num_items--;
151+
return item;
152+
}
153+
154+
// Returns 0 on success or -1 if the buffer failed to grow
155+
static int
156+
RingBuf_Put(RingBuf *buf, PyObject *item)
157+
{
158+
assert(buf->num_items <= buf->items_cap);
159+
160+
if (buf->num_items == buf->items_cap) {
161+
// Buffer is full, grow it.
162+
if (resize_ringbuf(buf, buf->items_cap * 2) < 0) {
163+
PyErr_NoMemory();
164+
return -1;
165+
}
166+
}
167+
buf->items[buf->put_idx] = Py_NewRef(item);
168+
buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
169+
buf->num_items++;
170+
return 0;
171+
}
172+
173+
static Py_ssize_t
174+
RingBuf_Len(RingBuf *buf)
175+
{
176+
return buf->num_items;
177+
}
178+
179+
static bool
180+
RingBuf_IsEmpty(RingBuf *buf)
181+
{
182+
return buf->num_items == 0;
183+
}
184+
28185
typedef struct {
29186
PyObject_HEAD
30187
PyThread_type_lock lock;
31188
int locked;
32-
PyObject *lst;
33-
Py_ssize_t lst_pos;
189+
RingBuf buf;
34190
PyObject *weakreflist;
35191
} simplequeueobject;
36192

@@ -43,7 +199,7 @@ class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(ty
43199
static int
44200
simplequeue_clear(simplequeueobject *self)
45201
{
46-
Py_CLEAR(self->lst);
202+
RingBuf_Fini(&self->buf);
47203
return 0;
48204
}
49205

@@ -69,7 +225,10 @@ simplequeue_dealloc(simplequeueobject *self)
69225
static int
70226
simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg)
71227
{
72-
Py_VISIT(self->lst);
228+
RingBuf *buf = &self->buf;
229+
for (Py_ssize_t i = 0, num_items = buf->num_items; i < num_items; i++) {
230+
Py_VISIT(RingBuf_At(buf, i));
231+
}
73232
Py_VISIT(Py_TYPE(self));
74233
return 0;
75234
}
@@ -90,15 +249,13 @@ simplequeue_new_impl(PyTypeObject *type)
90249
self = (simplequeueobject *) type->tp_alloc(type, 0);
91250
if (self != NULL) {
92251
self->weakreflist = NULL;
93-
self->lst = PyList_New(0);
94252
self->lock = PyThread_allocate_lock();
95-
self->lst_pos = 0;
96253
if (self->lock == NULL) {
97254
Py_DECREF(self);
98255
PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
99256
return NULL;
100257
}
101-
if (self->lst == NULL) {
258+
if (RingBuf_Init(&self->buf) < 0) {
102259
Py_DECREF(self);
103260
return NULL;
104261
}
@@ -126,7 +283,7 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
126283
/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
127284
{
128285
/* BEGIN GIL-protected critical section */
129-
if (PyList_Append(self->lst, item) < 0)
286+
if (RingBuf_Put(&self->buf, item) < 0)
130287
return NULL;
131288
if (self->locked) {
132289
/* A get() may be waiting, wake it up */
@@ -155,33 +312,6 @@ _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
155312
return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
156313
}
157314

158-
static PyObject *
159-
simplequeue_pop_item(simplequeueobject *self)
160-
{
161-
Py_ssize_t count, n;
162-
PyObject *item;
163-
164-
n = PyList_GET_SIZE(self->lst);
165-
assert(self->lst_pos < n);
166-
167-
item = PyList_GET_ITEM(self->lst, self->lst_pos);
168-
Py_INCREF(Py_None);
169-
PyList_SET_ITEM(self->lst, self->lst_pos, Py_None);
170-
self->lst_pos += 1;
171-
count = n - self->lst_pos;
172-
if (self->lst_pos > count) {
173-
/* The list is more than 50% empty, reclaim space at the beginning */
174-
if (PyList_SetSlice(self->lst, 0, self->lst_pos, NULL)) {
175-
/* Undo pop */
176-
self->lst_pos -= 1;
177-
PyList_SET_ITEM(self->lst, self->lst_pos, item);
178-
return NULL;
179-
}
180-
self->lst_pos = 0;
181-
}
182-
return item;
183-
}
184-
185315
/*[clinic input]
186316
_queue.SimpleQueue.get
187317
@@ -249,7 +379,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
249379
* So we simply try to acquire the lock in a loop, until the condition
250380
* (queue non-empty) becomes true.
251381
*/
252-
while (self->lst_pos == PyList_GET_SIZE(self->lst)) {
382+
while (RingBuf_IsEmpty(&self->buf)) {
253383
/* First a simple non-blocking try without releasing the GIL */
254384
r = PyThread_acquire_lock_timed(self->lock, 0, 0);
255385
if (r == PY_LOCK_FAILURE && microseconds != 0) {
@@ -279,8 +409,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
279409
}
280410

281411
/* BEGIN GIL-protected critical section */
282-
assert(self->lst_pos < PyList_GET_SIZE(self->lst));
283-
item = simplequeue_pop_item(self);
412+
item = RingBuf_Get(&self->buf);
284413
if (self->locked) {
285414
PyThread_release_lock(self->lock);
286415
self->locked = 0;
@@ -320,7 +449,7 @@ static int
320449
_queue_SimpleQueue_empty_impl(simplequeueobject *self)
321450
/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
322451
{
323-
return self->lst_pos == PyList_GET_SIZE(self->lst);
452+
return RingBuf_IsEmpty(&self->buf);
324453
}
325454

326455
/*[clinic input]
@@ -333,7 +462,7 @@ static Py_ssize_t
333462
_queue_SimpleQueue_qsize_impl(simplequeueobject *self)
334463
/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
335464
{
336-
return PyList_GET_SIZE(self->lst) - self->lst_pos;
465+
return RingBuf_Len(&self->buf);
337466
}
338467

339468
static int

0 commit comments

Comments
 (0)