Skip to content

Commit

Permalink
parking_lot: add mutexes and one-time notifications
Browse files Browse the repository at this point in the history
This adds a recursive lock that will be used for locking I/O streams.
The lock performs a direct handoff if the waiting thread has been paused
for at least 1 ms. Otherwise, the lock supports barging.

The design is inspired by WTF locks (WebKit) and locking in Go.
See https://webkit.org/blog/6161/locking-in-webkit/

The "parking lot" implementation will likely need to be improved before
release. Currently, it uses a fixed-size hashtable (251 taken from Go)
and the same linked-list for waiters and collisions. Note that Go uses
a treap (random binary serach tree) for collisions, while WebKit resizes
the hashtable to ensure 3x as many buckets as threads.
  • Loading branch information
colesbury committed Apr 23, 2023
1 parent a24dc2e commit 4584be5
Show file tree
Hide file tree
Showing 18 changed files with 2,079 additions and 0 deletions.
1 change: 1 addition & 0 deletions Include/Python.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "typeslots.h"
#include "pyhash.h"
#include "cpython/pydebug.h"
#include "cpython/lock.h"
#include "bytearrayobject.h"
#include "bytesobject.h"
#include "unicodeobject.h"
Expand Down
137 changes: 137 additions & 0 deletions Include/cpython/lock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#ifndef Py_LIMITED_API
#ifndef Py_LOCK_H
#define Py_LOCK_H

#include "pyatomic.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef struct {
uintptr_t v;
} _PyOnceFlag;

typedef struct {
uint8_t v;
} _PyMutex;

typedef _PyMutex PyMutex;

typedef struct {
uintptr_t v;
size_t recursions;
} _PyRecursiveMutex;

typedef enum {
UNLOCKED = 0,
LOCKED = 1,
HAS_PARKED = 2,
ONCE_INITIALIZED = 4,
THREAD_ID_MASK = ~(LOCKED | HAS_PARKED)
} _PyMutex_State;

PyAPI_FUNC(void) _PyMutex_lock_slow(_PyMutex *m);
PyAPI_FUNC(void) _PyMutex_unlock_slow(_PyMutex *m);
PyAPI_FUNC(int) _PyMutex_TryLockSlow(_PyMutex *m);

PyAPI_FUNC(void) _PyRecursiveMutex_lock_slow(_PyRecursiveMutex *m);
PyAPI_FUNC(void) _PyRecursiveMutex_unlock_slow(_PyRecursiveMutex *m);

PyAPI_FUNC(int) _PyBeginOnce_slow(_PyOnceFlag *o);
PyAPI_FUNC(void) _PyEndOnce(_PyOnceFlag *o);
PyAPI_FUNC(void) _PyEndOnceFailed(_PyOnceFlag *o);

static inline int
_PyMutex_is_locked(_PyMutex *m)
{
return _Py_atomic_load_uint8(&m->v) & LOCKED;
}

static inline int
_PyMutex_lock_fast(_PyMutex *m)
{
return _Py_atomic_compare_exchange_uint8(&m->v, UNLOCKED, LOCKED);
}

static inline void
_PyMutex_lock(_PyMutex *m)
{
if (_PyMutex_lock_fast(m)) {
return;
}
_PyMutex_lock_slow(m);
}

static inline int
_PyMutex_TryLock(_PyMutex *m)
{
if (_PyMutex_lock_fast(m)) {
return 1;
}
return _PyMutex_TryLockSlow(m);
}

static inline int
_PyMutex_unlock_fast(_PyMutex *m)
{
return _Py_atomic_compare_exchange_uint8(&m->v, LOCKED, UNLOCKED);
}

static inline void
_PyMutex_unlock(_PyMutex *m)
{
if (_PyMutex_unlock_fast(m)) {
return;
}
_PyMutex_unlock_slow(m);
}

static inline void
_PyRecursiveMutex_lock(_PyRecursiveMutex *m)
{
if (_Py_atomic_compare_exchange_uintptr(&m->v, UNLOCKED, _Py_ThreadId() | LOCKED)) {
return;
}
_PyRecursiveMutex_lock_slow(m);
}

static inline int
_PyRecursiveMutex_owns_lock(_PyRecursiveMutex *m)
{
uintptr_t v = _Py_atomic_load_uintptr(&m->v);
return (v & THREAD_ID_MASK) == _Py_ThreadId();
}

static inline void
_PyRecursiveMutex_unlock(_PyRecursiveMutex *m)
{
uintptr_t v = _Py_atomic_load_uintptr_relaxed(&m->v);
if (m->recursions == 0 && (v & 3) == LOCKED) {
if (_Py_atomic_compare_exchange_uintptr(&m->v, v, UNLOCKED)) {
return;
}
}
_PyRecursiveMutex_unlock_slow(m);
}

static inline int
_PyOnce_Initialized(_PyOnceFlag *o)
{
return (_Py_atomic_load_uintptr(&o->v) & ONCE_INITIALIZED) != 0;
}

static inline int
_PyBeginOnce(_PyOnceFlag *o)
{
if ((_Py_atomic_load_uintptr(&o->v) & ONCE_INITIALIZED) != 0) {
return 0;
}
return _PyBeginOnce_slow(o);
}

#ifdef __cplusplus
}
#endif
#endif /* !Py_LOCK_H */
#endif /* Py_LIMITED_API */
194 changes: 194 additions & 0 deletions Include/internal/pycore_critical_section.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
#ifndef Py_LIMITED_API
#ifndef Py_INTERNAL_CRITICAL_SECTION_H
#define Py_INTERNAL_CRITICAL_SECTION_H

#include "pycore_pystate.h"
#include <stdint.h>

#ifdef __cplusplus
extern "C" {
#endif

#ifndef Py_BUILD_CORE
# error "this header requires Py_BUILD_CORE define"
#endif

/*
* Implementation of Python critical sections.
*
* Python critical sections are helpers to replace the global interpreter lock
* with finer grained locking. A Python critical section is a region of code
* that can only be executed by a single thread at at time. The regions begin
* with a call to _Py_critical_section_begin and end with either an explicit
* call to _Py_critical_section_end or *implicitly* at any point that might
* have released the global interpreter lock. This is a substantial difference
* from the traditonal notion of a "critical section", where the end of the
* section is typically explicitly marked.
*
* The critical section can be resumed after a potential implicit ending by
* the _Py_critical_section_resume function.
*
* The purposed of the implicitly ending critical sections is to avoid
* potential deadlock due to holding locks on multiple objects. Any time a
* thread would have released the GIL, it releases all locks from critical
* sections. This includes block on a lock acquisition.
*
* The following are examples of calls that may implicitly end a critical
* section:
*
* Py_DECREF, PyObject_GC_New, PyObject_Call, PyObject_RichCompareBool,
* Py_BuildValue, _Py_critical_section_begin
*
* The following are examples of calls that do NOT implicitly end a critical
* section:
*
* Py_INCREF, PyMem_RawMalloc, PyMem_RawFree, memset and other C functions
* that do not call into the Python API.
*/

enum {
// Tag to used with `prev` pointer to differentiate _Py_critical_section
// vs. _Py_critical_section2.
_Py_CRITICAL_SECTION_INACTIVE = 1,

_Py_CRITICAL_SECTION_TWO_MUTEXES = 2,

_Py_CRITICAL_SECTION_MASK = 3
};

#define Py_BEGIN_CRITICAL_SECTION(m) { \
struct _Py_critical_section _cs; \
_Py_critical_section_begin(&_cs, m)

#define Py_END_CRITICAL_SECTION \
_Py_critical_section_end(&_cs); \
}

#define Py_BEGIN_CRITICAL_SECTION2(m1, m2) { \
struct _Py_critical_section2 _cs2; \
_Py_critical_section2_begin(&_cs2, m1, m2)

#define Py_END_CRITICAL_SECTION2 \
_Py_critical_section2_end(&_cs2); \
}

struct _Py_critical_section {
// Pointer to the an outer active critical section (or
// _Py_critical_section_sentinel). The two least-significant-bits indicate
// whether the pointed-to critical section is inactive and whether it is
// a _Py_critical_section2 object.
uintptr_t prev;

// Mutex used to protect critical section
_PyMutex *mutex;
};

// A critical section protected by two mutexes. Use
// _Py_critical_section2_begin and _Py_critical_section2_end.
struct _Py_critical_section2 {
struct _Py_critical_section base;

_PyMutex *mutex2;
};

static inline int
_Py_critical_section_is_active(uintptr_t tag)
{
return tag != 0 && (tag & _Py_CRITICAL_SECTION_INACTIVE) == 0;
}

PyAPI_FUNC(void)
_Py_critical_section_resume(PyThreadState *tstate);

PyAPI_FUNC(void)
_Py_critical_section_begin_slow(struct _Py_critical_section *c, _PyMutex *m);

PyAPI_FUNC(void)
_Py_critical_section2_begin_slow(struct _Py_critical_section2 *c,
_PyMutex *m1, _PyMutex *m2, int flag);

static inline void
_Py_critical_section_begin(struct _Py_critical_section *c, _PyMutex *m)
{
if (_PyMutex_lock_fast(m)) {
PyThreadState *tstate = PyThreadState_GET();
c->mutex = m;
c->prev = tstate->critical_section;
tstate->critical_section = (uintptr_t)c;
}
else {
_Py_critical_section_begin_slow(c, m);
}
}

static inline void
_Py_critical_section_pop(struct _Py_critical_section *c)
{
PyThreadState *tstate = PyThreadState_GET();
uintptr_t prev = c->prev;
tstate->critical_section = prev;

if (_PY_UNLIKELY((prev & _Py_CRITICAL_SECTION_INACTIVE))) {
_Py_critical_section_resume(tstate);
}
}

static inline void
_Py_critical_section_end(struct _Py_critical_section *c)
{
_PyMutex_unlock(c->mutex);
_Py_critical_section_pop(c);
}

static inline void
_Py_critical_section2_begin(struct _Py_critical_section2 *c,
_PyMutex *m1, _PyMutex *m2)
{
if ((uintptr_t)m2 < (uintptr_t)m1) {
_PyMutex *m1_ = m1;
m1 = m2;
m2 = m1_;
}
else if (m1 == m2) {
c->mutex2 = NULL;
_Py_critical_section_begin(&c->base, m1);
return;
}
if (_PyMutex_lock_fast(m1)) {
if (_PyMutex_lock_fast(m2)) {
PyThreadState *tstate = PyThreadState_GET();
c->base.mutex = m1;
c->mutex2 = m2;
c->base.prev = tstate->critical_section;

uintptr_t p = (uintptr_t)c | _Py_CRITICAL_SECTION_TWO_MUTEXES;
tstate->critical_section = p;
}
else {
_Py_critical_section2_begin_slow(c, m1, m2, 1);
}
}
else {
_Py_critical_section2_begin_slow(c, m1, m2, 0);
}
}

static inline void
_Py_critical_section2_end(struct _Py_critical_section2 *c)
{
if (c->mutex2) {
_PyMutex_unlock(c->mutex2);
}
_PyMutex_unlock(c->base.mutex);
_Py_critical_section_pop(&c->base);
}

PyAPI_FUNC(void)
_Py_critical_section_end_all(PyThreadState *tstate);


#ifdef __cplusplus
}
#endif
#endif /* !Py_INTERNAL_CRITICAL_SECTION_H */
#endif /* !Py_LIMITED_API */
2 changes: 2 additions & 0 deletions Include/internal/pycore_gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ extern "C" {
# error "this header requires Py_BUILD_CORE define"
#endif

#include "pycore_lock.h"

/* GC information is stored BEFORE the object structure. */
typedef struct {
// Pointer to next object in the list.
Expand Down
Loading

0 comments on commit 4584be5

Please sign in to comment.