Skip to content

Commit

Permalink
merge 3.3-slp (Stackless python#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anselm Kruis committed Dec 11, 2016
2 parents 126fa99 + baa885e commit b0789ae
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 32 deletions.
3 changes: 3 additions & 0 deletions Stackless/changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ What's New in Stackless 3.X.X?

*Release date: 20XX-XX-XX*

- https://bitbucket.org/stackless-dev/stackless/issues/105
Fix an occasional NULL-pointer access during interpreter shutdown.

- https://bitbucket.org/stackless-dev/stackless/issues/104
Initialise the global variable slp_initial_tstate early before it is used.

Expand Down
133 changes: 101 additions & 32 deletions Stackless/core/stacklesseval.c
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ kill_pending_current_main_and_watchdogs(PyThreadState *ts)
}

static void
run_other_threads(PyObject **sleep, int count)
run_other_threads(PyObject **sleep, Py_ssize_t count)
{
if (count == 0) {
/* shortcut */
Expand Down Expand Up @@ -545,10 +545,10 @@ void slp_kill_tasks_with_stacks(PyThreadState *target_ts)
* states. That will hopefully happen when their threads exit.
*/
{
PyCStackObject *csfirst, *cs;
PyCStackObject *cs;
PyTaskletObject *t;
PyObject *sleepfunc = NULL;
int count;
Py_ssize_t count;

/* other threads, first pass: kill (pending) current, main and watchdog tasklets */
if (target_ts == NULL) {
Expand Down Expand Up @@ -580,50 +580,120 @@ void slp_kill_tasks_with_stacks(PyThreadState *target_ts)

/* other threads, second pass: kill tasklets with nesting-level > 0 and
* clear tstate if target_ts != NULL && target_ts != cts. */
csfirst = slp_cstack_chain;
if (csfirst == NULL) {
if (slp_cstack_chain == NULL) {
Py_XDECREF(sleepfunc);
goto current_main;
}

count = 0;
in_loop = 0;
for (cs = csfirst; !(in_loop && cs == csfirst); cs = cs->next) {
/* build a tuple of all tasklets to be killed:
* 1. count the tasklets
* 2. alloc a tuple and record them
* 3. kill them
* Steps 1 and 2 must not run Python code (release the GIL), because another thread could
* modify slp_cstack_chain.
*/
for(cs = slp_cstack_chain; cs != slp_cstack_chain || in_loop == 0; cs = cs->next) {
/* Count tasklets to be killed.
* This loop body must not release the GIL
*/
assert(cs);
assert(cs->next);
assert(cs->next->prev == cs);
in_loop = 1;
t = cs->task;
if (t == NULL)
continue;
Py_INCREF(t); /* cs->task is a borrowed ref */
if (t->cstate != cs) {
Py_DECREF(t);
continue; /* not the current cstate of the tasklet */
}
if (cs->tstate == NULL || cs->tstate == cts) {
Py_DECREF(t);
continue; /* already handled */
}
if (target_ts != NULL && cs->tstate != target_ts) {
Py_DECREF(t);
continue; /* we are not interested in this thread */
}
if (((cs->tstate && cs->tstate->st.current == t) ? cs->tstate->st.nesting_level : cs->nesting_level) > 0) {
if (((cs->tstate && cs->tstate->st.current == t) ?
cs->tstate->st.nesting_level : cs->nesting_level) > 0) {
/* Kill only tasklets with nesting level > 0 */
count++;
PyTasklet_Kill(t);
PyErr_Clear();
}
Py_DECREF(t);
if (target_ts != NULL) {
cs->tstate = NULL;
}
}
if (target_ts == NULL) {
/* We must not release the GIL while we might hold the HEAD-lock.
* Otherwise another thread (usually the thread of the killed tasklet)
* could try to get the HEAD lock. The result would be a wonderful dead lock.
* If target_ts is NULL, we know for sure, that we don't hold the HEAD-lock.
*/
run_other_threads(&sleepfunc, count);
assert(cs == slp_cstack_chain);
if (count > 0) {
PyObject *tasklets = PyTuple_New(count);
if (NULL == tasklets) {
PyErr_Print();
return;
}
assert(cs == slp_cstack_chain);
for(in_loop = 0, count=0; cs != slp_cstack_chain || in_loop == 0; cs = cs->next) {
/* Record tasklets to be killed.
* This loop body must not release the GIL.
*/
assert(cs);
assert(cs->next);
assert(cs->next->prev == cs);
in_loop = 1;
t = cs->task;
if (t == NULL)
continue;
if (t->cstate != cs) {
continue; /* not the current cstate of the tasklet */
}
if (cs->tstate == NULL || cs->tstate == cts) {
continue; /* already handled */
}
if (target_ts != NULL && cs->tstate != target_ts) {
continue; /* we are not interested in this thread */
}
if (((cs->tstate && cs->tstate->st.current == t) ?
cs->tstate->st.nesting_level : cs->nesting_level) > 0) {
/* Kill only tasklets with nesting level > 0 */
Py_INCREF(t);
assert(count < PyTuple_GET_SIZE(tasklets));
PyTuple_SET_ITEM(tasklets, count, (PyObject *)t); /* steals a reference to t */
count++;
}
}
assert(count == PyTuple_GET_SIZE(tasklets));
for (count = 0; count < PyTuple_GET_SIZE(tasklets); count++) {
/* Kill the tasklets.
*/
t = (PyTaskletObject *)PyTuple_GET_ITEM(tasklets, count);
cs = t->cstate;
assert(cs);
if (cs->tstate == NULL || cs->tstate == cts) {
continue; /* already handled */
}
if (target_ts != NULL && cs->tstate != target_ts) {
continue; /* we are not interested in this thread */
}
Py_INCREF(cs);
if (((cs->tstate && cs->tstate->st.current == t) ?
cs->tstate->st.nesting_level : cs->nesting_level) > 0) {
/* Kill only tasklets with nesting level > 0
* We must check again, because killing one tasklet
* can change the state of other tasklets too.
*/
PyTasklet_Kill(t);
PyErr_Clear();
}
if (target_ts != NULL) {
cs->tstate = NULL;
}
Py_DECREF(cs);
}
Py_DECREF(tasklets);
if (target_ts == NULL) {
/* We must not release the GIL while we might hold the HEAD-lock.
* Otherwise another thread (usually the thread of the killed tasklet)
* could try to get the HEAD lock. The result would be a wonderful dead lock.
* If target_ts is NULL, we know for sure, that we don't hold the HEAD-lock.
*/
run_other_threads(&sleepfunc, count);
}
}
Py_XDECREF(sleepfunc);
}
Expand All @@ -636,17 +706,16 @@ void slp_kill_tasks_with_stacks(PyThreadState *target_ts)
* should be left.
*/
if (target_ts == NULL || target_ts == cts) {
/* a loop to kill tasklets on the local thread */
PyCStackObject *csfirst = slp_cstack_chain, *cs;
PyCStackObject *cs;

if (csfirst == NULL)
if (slp_cstack_chain == NULL)
return;
in_loop = 0;
for (cs = csfirst; ; cs = cs->next) {
if (in_loop && cs == csfirst) {
/* nothing found */
break;
}
for (cs = slp_cstack_chain; cs != slp_cstack_chain || in_loop == 0; cs = cs->next) {
/* This loop body must not release the GIL. */
assert(cs);
assert(cs->next);
assert(cs->next->prev == cs);
in_loop = 1;
/* has tstate already been cleared or is it a foreign thread? */
if (target_ts == NULL || cs->tstate == cts) {
Expand Down
75 changes: 75 additions & 0 deletions Stackless/unittests/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,81 @@ def exit():
"""] + args)
self.assertEqual(rc, 42)

@unittest.skipUnless(withThreads, "requires thread support")
def test_kill_modifies_slp_cstack_chain(self):
# test for issue #105 https://bitbucket.org/stackless-dev/stackless/issues/105/

args = []
if not stackless.enable_softswitch(None):
args.append("--hard")

rc = subprocess.call([sys.executable, "-s", "-S", "-E", "-c", """from __future__ import print_function, absolute_import, division\nif 1:
import threading
import stackless
import time
import sys
from stackless import _test_nostacklesscall as apply
DEBUG = False
event = threading.Event()
if not DEBUG:
def print(*args, **kw):
pass
def mytask():
stackless.schedule_remove()
class TaskHolder(object):
def __init__(self, task):
self.task = task
def __del__(self):
while self.task.alive:
time.sleep(0.1)
print("TaskHolder.__del__, task1 is still alive")
print("TaskHolder.__del__: task1 is now dead")
self.task = None
def other_thread():
# create a paused tasklet
task1 = stackless.tasklet(apply)(mytask)
stackless.run()
assert(task1.alive)
assert(task1.paused)
assert(task1.nesting_level > 0)
assert(task1 is task1.cstate.task)
assert(len(str(task1.cstate)) > 0)
assert(sys.getrefcount(task1.cstate) - sys.getrefcount(object()) == 1)
assert(task1.tempval is task1)
assert(sys.getrefcount(task1) - sys.getrefcount(object()) == 2)
task1.tempval = TaskHolder(task1)
assert(sys.getrefcount(task1) - sys.getrefcount(object()) == 2)
print("task1", task1)
print("task1.cstate", repr(task1.cstate))
print("ending main tasklet of other_thread, going run the scheduler")
task1 = None
event.set()
# sleep for a long time
t = 10000
while t > 0:
try:
t -= 1
stackless.run()
time.sleep(0.01)
except:
pass
t = threading.Thread(target=other_thread, name="other_thread")
t.daemon = True
t.start()
event.wait(5.0)
print("end")
sys.exit(42)
"""] + args)
self.assertEqual(rc, 42)


class TestInterpreterShutdown(unittest.TestCase):
# intentionally not a subclass of StacklessTestCase.
Expand Down

0 comments on commit b0789ae

Please sign in to comment.