Skip to content

Commit

Permalink
ParkingLot: use thread-local waiter instead of PyThreadState
Browse files Browse the repository at this point in the history
Sometimes we need to acquire locks before the current PyThreadState
is set. This moves the waiter data to it's own struct. It's shared
between PyThreadStates on the same thread, such as in multi-interpreter
settings.

Fixes #85
  • Loading branch information
colesbury committed Aug 15, 2020
1 parent b0a0700 commit 083382d
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 227 deletions.
5 changes: 4 additions & 1 deletion Include/cpython/pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ typedef struct mi_heap_s mi_heap_t;
struct PyThreadStateOS;
typedef struct PyThreadStateOS PyThreadStateOS;

struct Waiter;
typedef struct _PyEventRC _PyEventRC;


Expand All @@ -69,7 +70,7 @@ struct _ts {

/* OS-specific state (for locking and parking) */
PyThreadStateOS *os;
uintptr_t handoff_elem;
uintptr_t _unused_handoff_elem; // TODO: delete before release, but gonna require recompiling conda binaries

/* thread status */
int32_t status;
Expand Down Expand Up @@ -152,6 +153,8 @@ struct _ts {
/* Unique thread state id. */
uint64_t id;

struct Waiter *waiter;

/* XXX signal handlers should also be here */
struct method_cache_entry method_cache[(1 << MCACHE_SIZE_EXP)];
};
Expand Down
19 changes: 0 additions & 19 deletions Include/internal/pycore_pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,35 +365,16 @@ _PyThreadState_CheckForShutdown(PyThreadState *tstate)


/* Other */
struct PyThreadStateWaiter {
struct PyThreadStateWaiter *next;
struct PyThreadStateWaiter *prev;
uintptr_t key;
int64_t time_to_be_fair;
};

struct brc_queued_object;

struct PyThreadStateOS {
struct PyThreadStateWaiter waiter;

PyThreadState *tstate;
PyThreadState *next_waiter;
PyMUTEX_T waiter_mutex;
PyCOND_T waiter_cond;
int waiter_counter;

struct _PyBrcState {
struct llist_node node;
uintptr_t thread_id;
struct brc_queued_object *queue;
} brc;

/* DEBUG info */
PyThreadState *last_notifier;
const char *last_notifier_msg;
void *last_notifier_data;
int64_t counter;
};

PyAPI_FUNC(void) _PyThreadState_Init(
Expand Down
13 changes: 4 additions & 9 deletions Include/lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
extern "C" {
#endif

struct _ts;
typedef struct _ts PyThreadState;

typedef struct {
uintptr_t v;
} _PyRawMutex;
Expand Down Expand Up @@ -52,19 +49,18 @@ void _PyRecursiveMutex_lock_slow(_PyRecursiveMutex *m);
void _PyRecursiveMutex_unlock_slow(_PyRecursiveMutex *m);

void _PyRawEvent_Notify(_PyRawEvent *o);
void _PyRawEvent_Wait(_PyRawEvent *o, PyThreadState *tstate);
int _PyRawEvent_TimedWait(_PyRawEvent *o, PyThreadState *tstate, int64_t ns);
void _PyRawEvent_Wait(_PyRawEvent *o);
int _PyRawEvent_TimedWait(_PyRawEvent *o, int64_t ns);
void _PyRawEvent_Reset(_PyRawEvent *o);

void _PyEvent_Notify(_PyEvent *o);
void _PyEvent_Wait(_PyEvent *o, PyThreadState *tstate);
int _PyEvent_TimedWait(_PyEvent *o, PyThreadState *tstate, int64_t ns);
void _PyEvent_Wait(_PyEvent *o);
int _PyEvent_TimedWait(_PyEvent *o, int64_t ns);

int _PyBeginOnce_slow(_PyOnceFlag *o);
void _PyEndOnce(_PyOnceFlag *o);
void _PyEndOnceFailed(_PyOnceFlag *o);


static inline int
_PyMutex_is_locked(_PyMutex *m)
{
Expand All @@ -80,7 +76,6 @@ _PyRawMutex_is_locked(_PyRawMutex *m)
static inline void
_PyRawMutex_lock(_PyRawMutex *m)
{
// lock_count++;
if (_Py_atomic_compare_exchange_uintptr(&m->v, UNLOCKED, LOCKED)) {
return;
}
Expand Down
41 changes: 32 additions & 9 deletions Include/parking_lot.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define Py_PARKING_LOT_H

#include "pyatomic.h"
#include "pycore_llist.h"
#include "pycore_condvar.h"

#ifdef __cplusplus
extern "C" {
Expand All @@ -14,29 +16,50 @@ enum {
PY_PARK_OK = 0,
};

int
_PySemaphore_Wait(PyThreadState *tstate, int64_t ns);
typedef struct Waiter {
struct llist_node node; // wait queue node
Py_ssize_t refcount;
struct Waiter *next_waiter; // for "raw" locks
PyMUTEX_T mutex;
PyCOND_T cond;
int counter;
uintptr_t key;
int64_t time_to_be_fair;
uintptr_t thread_id;
uintptr_t handoff_elem;
} Waiter;

Waiter *
_PyParkingLot_InitThread(void);

void
_PySemaphore_Signal(PyThreadStateOS *os, const char *msg, void *data);
_PyParkingLot_DeinitThread(Waiter *waiter);

Waiter *
_PyParkingLot_ThisWaiter(void);

void
_PySemaphore_Signal(Waiter *waiter, const char *msg, void *data);

int
_PySemaphore_Wait(Waiter *waiter, int64_t ns);

/* Functions for waking and parking threads */
int
_PyParkingLot_ParkInt32(const int32_t *key, int32_t expected);

int
_PyParkingLot_Park(const uintptr_t *key, uintptr_t expected,
_PyTime_t start_time, int64_t timeout_ns);
_PyParkingLot_Park(const void *key, uintptr_t expected,
_PyTime_t start_time, int64_t ns);

void
_PyParkingLot_UnparkAll(const void *key);

void
_PyParkingLot_BeginUnpark(const void *key, PyThreadState **tstate,
int *more_waiters, int *time_to_be_fair);
_PyParkingLot_BeginUnpark(const void *key, struct Waiter **out,
int *more_waiters, int *should_be_fair);

void
_PyParkingLot_FinishUnpark(const void *key, PyThreadState *tstate);
_PyParkingLot_FinishUnpark(const void *key, struct Waiter *waiter);

void
_PyParkingLot_AfterFork(void);
Expand Down
6 changes: 3 additions & 3 deletions Modules/_queuemodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
if (self->waiting) {
int more_waiters;
int should_be_fair;
PyThreadState *waiter;
Waiter *waiter;

/* If there is a waiter, handoff the item directly */
_PyParkingLot_BeginUnpark(&self->waiting, &waiter, &more_waiters, &should_be_fair);
Expand Down Expand Up @@ -279,8 +279,8 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, int block,
}
}

PyThreadState *tstate = PyThreadState_GET();
tstate->handoff_elem = (uintptr_t)&item;
Waiter *this_waiter = _PyParkingLot_ThisWaiter();
this_waiter->handoff_elem = (uintptr_t)&item;
int ret = _PyParkingLot_Park(&self->waiting, 1, 0, timeout_ns);
if (ret == PY_PARK_OK) {
assert(item);
Expand Down
4 changes: 1 addition & 3 deletions Modules/_threadmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,6 @@ event_set(eventobject *self, PyObject *Py_UNUSED(ignored))
static PyObject *
event_wait(eventobject *self, PyObject *args)
{
PyThreadState *tstate = PyThreadState_Get();

PyObject *timeout_obj = NULL;
if (!PyArg_ParseTuple(args, "|O:wait", &timeout_obj)) {
return NULL;
Expand All @@ -721,7 +719,7 @@ event_wait(eventobject *self, PyObject *args)
}
}

int ok = _PyEvent_TimedWait(&self->erc->event, tstate, timeout_ns);
int ok = _PyEvent_TimedWait(&self->erc->event, timeout_ns);
if (ok) {
Py_RETURN_TRUE;
}
Expand Down
Loading

0 comments on commit 083382d

Please sign in to comment.