Skip to content

Commit

Permalink
[3.13] pythongh-119121: Fix and test async.staggered.staggered_race (
Browse files Browse the repository at this point in the history
…pythonGH-119173) (python#119206)

pythongh-119121: Fix and test `async.staggered.staggered_race` (pythonGH-119173)
(cherry picked from commit 16b46eb)

Co-authored-by: Nikita Sobolev <mail@sobolevn.me>
  • Loading branch information
2 people authored and Kronuz committed May 20, 2024
1 parent 3b90807 commit dcf3f18
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 31 deletions.
6 changes: 1 addition & 5 deletions Include/cpython/dictobject.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,7 @@ static inline Py_ssize_t PyDict_GET_SIZE(PyObject *op) {
PyDictObject *mp;
assert(PyDict_Check(op));
mp = _Py_CAST(PyDictObject*, op);
#ifdef Py_GIL_DISABLED
return _Py_atomic_load_ssize_relaxed(&mp->ma_used);
#else
return mp->ma_used;
#endif
return FT_ATOMIC_LOAD_SSIZE_RELAXED(mp->ma_used);
}
#define PyDict_GET_SIZE(op) PyDict_GET_SIZE(_PyObject_CAST(op))

Expand Down
3 changes: 1 addition & 2 deletions Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ async def staggered_race(coro_fns, delay, *, loop=None):
exceptions = []
running_tasks = []

async def run_one_coro(
previous_failed: typing.Optional[locks.Event]) -> None:
async def run_one_coro(previous_failed) -> None:
# Wait for the previous task to finish, or for delay seconds
if previous_failed is not None:
with contextlib.suppress(exceptions_mod.TimeoutError):
Expand Down
97 changes: 97 additions & 0 deletions Lib/test/test_asyncio/test_staggered.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import asyncio
import unittest
from asyncio.staggered import staggered_race

from test import support

support.requires_working_socket(module=True)


def tearDownModule():
asyncio.set_event_loop_policy(None)


class StaggeredTests(unittest.IsolatedAsyncioTestCase):
async def test_empty(self):
winner, index, excs = await staggered_race(
[],
delay=None,
)

self.assertIs(winner, None)
self.assertIs(index, None)
self.assertEqual(excs, [])

async def test_one_successful(self):
async def coro(index):
return f'Res: {index}'

winner, index, excs = await staggered_race(
[
lambda: coro(0),
lambda: coro(1),
],
delay=None,
)

self.assertEqual(winner, 'Res: 0')
self.assertEqual(index, 0)
self.assertEqual(excs, [None])

async def test_first_error_second_successful(self):
async def coro(index):
if index == 0:
raise ValueError(index)
return f'Res: {index}'

winner, index, excs = await staggered_race(
[
lambda: coro(0),
lambda: coro(1),
],
delay=None,
)

self.assertEqual(winner, 'Res: 1')
self.assertEqual(index, 1)
self.assertEqual(len(excs), 2)
self.assertIsInstance(excs[0], ValueError)
self.assertIs(excs[1], None)

async def test_first_timeout_second_successful(self):
async def coro(index):
if index == 0:
await asyncio.sleep(10) # much bigger than delay
return f'Res: {index}'

winner, index, excs = await staggered_race(
[
lambda: coro(0),
lambda: coro(1),
],
delay=0.1,
)

self.assertEqual(winner, 'Res: 1')
self.assertEqual(index, 1)
self.assertEqual(len(excs), 2)
self.assertIsInstance(excs[0], asyncio.CancelledError)
self.assertIs(excs[1], None)

async def test_none_successful(self):
async def coro(index):
raise ValueError(index)

winner, index, excs = await staggered_race(
[
lambda: coro(0),
lambda: coro(1),
],
delay=None,
)

self.assertIs(winner, None)
self.assertIs(index, None)
self.assertEqual(len(excs), 2)
self.assertIsInstance(excs[0], ValueError)
self.assertIsInstance(excs[1], ValueError)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix a NameError happening in ``asyncio.staggered.staggered_race``. This
function is now tested.
61 changes: 37 additions & 24 deletions Objects/dictobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ ASSERT_DICT_LOCKED(PyObject *op)
_Py_CRITICAL_SECTION_ASSERT_OBJECT_LOCKED(op);
}
#define ASSERT_DICT_LOCKED(op) ASSERT_DICT_LOCKED(_Py_CAST(PyObject*, op))
#define ASSERT_WORLD_STOPPED_OR_DICT_LOCKED(op) \
if (!_PyInterpreterState_GET()->stoptheworld.world_stopped) { \
ASSERT_DICT_LOCKED(op); \
}

#define IS_DICT_SHARED(mp) _PyObject_GC_IS_SHARED(mp)
#define SET_DICT_SHARED(mp) _PyObject_GC_SET_SHARED(mp)
#define LOAD_INDEX(keys, size, idx) _Py_atomic_load_int##size##_relaxed(&((const int##size##_t*)keys->dk_indices)[idx]);
Expand Down Expand Up @@ -670,6 +675,8 @@ dump_entries(PyDictKeysObject *dk)
int
_PyDict_CheckConsistency(PyObject *op, int check_content)
{
ASSERT_WORLD_STOPPED_OR_DICT_LOCKED(op);

#define CHECK(expr) \
do { if (!(expr)) { _PyObject_ASSERT_FAILED_MSG(op, Py_STRINGIFY(expr)); } } while (0)

Expand Down Expand Up @@ -1722,13 +1729,14 @@ static void
insert_split_value(PyInterpreterState *interp, PyDictObject *mp, PyObject *key, PyObject *value, Py_ssize_t ix)
{
assert(PyUnicode_CheckExact(key));
ASSERT_DICT_LOCKED(mp);
MAINTAIN_TRACKING(mp, key, value);
PyObject *old_value = mp->ma_values->values[ix];
if (old_value == NULL) {
uint64_t new_version = _PyDict_NotifyEvent(interp, PyDict_EVENT_ADDED, mp, key, value);
STORE_SPLIT_VALUE(mp, ix, Py_NewRef(value));
_PyDictValues_AddToInsertionOrder(mp->ma_values, ix);
mp->ma_used++;
STORE_USED(mp, mp->ma_used + 1);
mp->ma_version_tag = new_version;
}
else {
Expand Down Expand Up @@ -1792,7 +1800,7 @@ insertdict(PyInterpreterState *interp, PyDictObject *mp,
goto Fail;
}
mp->ma_version_tag = new_version;
mp->ma_used++;
STORE_USED(mp, mp->ma_used + 1);
ASSERT_CONSISTENT(mp);
return 0;
}
Expand Down Expand Up @@ -1861,7 +1869,7 @@ insert_to_emptydict(PyInterpreterState *interp, PyDictObject *mp,
ep->me_hash = hash;
STORE_VALUE(ep, value);
}
FT_ATOMIC_STORE_SSIZE_RELAXED(mp->ma_used, FT_ATOMIC_LOAD_SSIZE_RELAXED(mp->ma_used) + 1);
STORE_USED(mp, mp->ma_used + 1);
mp->ma_version_tag = new_version;
newkeys->dk_usable--;
newkeys->dk_nentries++;
Expand All @@ -1870,11 +1878,7 @@ insert_to_emptydict(PyInterpreterState *interp, PyDictObject *mp,
// the case where we're inserting from the non-owner thread. We don't use
// set_keys here because the transition from empty to non-empty is safe
// as the empty keys will never be freed.
#ifdef Py_GIL_DISABLED
_Py_atomic_store_ptr_release(&mp->ma_keys, newkeys);
#else
mp->ma_keys = newkeys;
#endif
FT_ATOMIC_STORE_PTR_RELEASE(mp->ma_keys, newkeys);
return 0;
}

Expand Down Expand Up @@ -2580,7 +2584,7 @@ delitem_common(PyDictObject *mp, Py_hash_t hash, Py_ssize_t ix,
Py_ssize_t hashpos = lookdict_index(mp->ma_keys, hash, ix);
assert(hashpos >= 0);

FT_ATOMIC_STORE_SSIZE_RELAXED(mp->ma_used, FT_ATOMIC_LOAD_SSIZE(mp->ma_used) - 1);
STORE_USED(mp, mp->ma_used - 1);
mp->ma_version_tag = new_version;
if (_PyDict_HasSplitTable(mp)) {
assert(old_value == mp->ma_values->values[ix]);
Expand Down Expand Up @@ -2752,7 +2756,7 @@ clear_lock_held(PyObject *op)
// We don't inc ref empty keys because they're immortal
ensure_shared_on_resize(mp);
mp->ma_version_tag = new_version;
mp->ma_used = 0;
STORE_USED(mp, 0);
if (oldvalues == NULL) {
set_keys(mp, Py_EMPTY_KEYS);
assert(oldkeys->dk_refcnt == 1);
Expand Down Expand Up @@ -3191,6 +3195,8 @@ dict_repr_lock_held(PyObject *self)
_PyUnicodeWriter writer;
int first;

ASSERT_DICT_LOCKED(mp);

i = Py_ReprEnter((PyObject *)mp);
if (i != 0) {
return i > 0 ? PyUnicode_FromString("{...}") : NULL;
Expand Down Expand Up @@ -3279,8 +3285,7 @@ dict_repr(PyObject *self)
static Py_ssize_t
dict_length(PyObject *self)
{
PyDictObject *mp = (PyDictObject *)self;
return _Py_atomic_load_ssize_relaxed(&mp->ma_used);
return FT_ATOMIC_LOAD_SSIZE_RELAXED(((PyDictObject *)self)->ma_used);
}

static PyObject *
Expand Down Expand Up @@ -3672,6 +3677,9 @@ PyDict_MergeFromSeq2(PyObject *d, PyObject *seq2, int override)
static int
dict_dict_merge(PyInterpreterState *interp, PyDictObject *mp, PyDictObject *other, int override)
{
ASSERT_DICT_LOCKED(mp);
ASSERT_DICT_LOCKED(other);

if (other == mp || other->ma_used == 0)
/* a.update(a) or a.update({}); nothing to do */
return 0;
Expand Down Expand Up @@ -3699,7 +3707,7 @@ dict_dict_merge(PyInterpreterState *interp, PyDictObject *mp, PyDictObject *othe
ensure_shared_on_resize(mp);
dictkeys_decref(interp, mp->ma_keys, IS_DICT_SHARED(mp));
mp->ma_keys = keys;
mp->ma_used = other->ma_used;
STORE_USED(mp, other->ma_used);
mp->ma_version_tag = new_version;
ASSERT_CONSISTENT(mp);

Expand Down Expand Up @@ -4034,7 +4042,7 @@ PyDict_Size(PyObject *mp)
PyErr_BadInternalCall();
return -1;
}
return ((PyDictObject *)mp)->ma_used;
return FT_ATOMIC_LOAD_SSIZE_RELAXED(((PyDictObject *)mp)->ma_used);
}

/* Return 1 if dicts equal, 0 if not, -1 if error.
Expand Down Expand Up @@ -4291,7 +4299,7 @@ dict_setdefault_ref_lock_held(PyObject *d, PyObject *key, PyObject *default_valu
}

MAINTAIN_TRACKING(mp, key, value);
mp->ma_used++;
STORE_USED(mp, mp->ma_used + 1);
mp->ma_version_tag = new_version;
assert(mp->ma_keys->dk_usable >= 0);
ASSERT_CONSISTENT(mp);
Expand Down Expand Up @@ -4413,6 +4421,8 @@ dict_popitem_impl(PyDictObject *self)
uint64_t new_version;
PyInterpreterState *interp = _PyInterpreterState_GET();

ASSERT_DICT_LOCKED(self);

/* Allocate the result tuple before checking the size. Believe it
* or not, this allocation could trigger a garbage collection which
* could empty the dict, so if we checked the size first and that
Expand Down Expand Up @@ -4952,19 +4962,21 @@ typedef struct {
static PyObject *
dictiter_new(PyDictObject *dict, PyTypeObject *itertype)
{
Py_ssize_t used;
dictiterobject *di;
di = PyObject_GC_New(dictiterobject, itertype);
if (di == NULL) {
return NULL;
}
di->di_dict = (PyDictObject*)Py_NewRef(dict);
di->di_used = dict->ma_used;
di->len = dict->ma_used;
used = FT_ATOMIC_LOAD_SSIZE(dict->ma_used);
di->di_used = used;
di->len = used;
if (itertype == &PyDictRevIterKey_Type ||
itertype == &PyDictRevIterItem_Type ||
itertype == &PyDictRevIterValue_Type) {
if (_PyDict_HasSplitTable(dict)) {
di->di_pos = dict->ma_used - 1;
di->di_pos = used - 1;
}
else {
di->di_pos = load_keys_nentries(dict) - 1;
Expand Down Expand Up @@ -5013,8 +5025,8 @@ dictiter_len(PyObject *self, PyObject *Py_UNUSED(ignored))
{
dictiterobject *di = (dictiterobject *)self;
Py_ssize_t len = 0;
if (di->di_dict != NULL && di->di_used == di->di_dict->ma_used)
len = di->len;
if (di->di_dict != NULL && di->di_used == FT_ATOMIC_LOAD_SSIZE_RELAXED(di->di_dict->ma_used))
len = FT_ATOMIC_LOAD_SSIZE_RELAXED(di->len);
return PyLong_FromSize_t(len);
}

Expand Down Expand Up @@ -5297,6 +5309,7 @@ dictiter_iternextitem_lock_held(PyDictObject *d, PyObject *self,
Py_ssize_t i;

assert (PyDict_Check(d));
ASSERT_DICT_LOCKED(d);

if (di->di_used != d->ma_used) {
PyErr_SetString(PyExc_RuntimeError,
Expand Down Expand Up @@ -5811,7 +5824,7 @@ dictview_len(PyObject *self)
_PyDictViewObject *dv = (_PyDictViewObject *)self;
Py_ssize_t len = 0;
if (dv->dv_dict != NULL)
len = dv->dv_dict->ma_used;
len = FT_ATOMIC_LOAD_SSIZE_RELAXED(dv->dv_dict->ma_used);
return len;
}

Expand Down Expand Up @@ -6820,15 +6833,15 @@ store_instance_attr_lock_held(PyObject *obj, PyDictValues *values,
_PyDictValues_AddToInsertionOrder(values, ix);
if (dict) {
assert(dict->ma_values == values);
dict->ma_used++;
STORE_USED(dict, dict->ma_used + 1);
}
}
else {
if (value == NULL) {
delete_index_from_values(values, ix);
if (dict) {
assert(dict->ma_values == values);
dict->ma_used--;
STORE_USED(dict, dict->ma_used - 1);
}
}
Py_DECREF(old_value);
Expand Down Expand Up @@ -7039,7 +7052,7 @@ _PyObject_IsInstanceDictEmpty(PyObject *obj)
if (dict == NULL) {
return 1;
}
return ((PyDictObject *)dict)->ma_used == 0;
return FT_ATOMIC_LOAD_SSIZE_RELAXED(((PyDictObject *)dict)->ma_used) == 0;
}

int
Expand Down

0 comments on commit dcf3f18

Please sign in to comment.