Skip to content

Commit

Permalink
Fix hang caused by steal command with empty test queue
Browse files Browse the repository at this point in the history
  • Loading branch information
amezin committed Mar 9, 2023
1 parent 58fd7cc commit 6abcdfc
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
1 change: 1 addition & 0 deletions changelog/884.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed hang in ``worksteal`` scheduler.
17 changes: 15 additions & 2 deletions src/xdist/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def worker_title(title):

class WorkerInteractor:
SHUTDOWN_MARK = object()
QUEUE_REPLACED_MARK = object()

def __init__(self, config, channel):
self.config = config
Expand All @@ -72,6 +73,15 @@ def __init__(self, config, channel):
def _make_queue(self):
return self.channel.gateway.execmodel.queue.Queue()

def _get_next_item_index(self):
"""Gets the next item from test queue. Handles the case when the queue
is replaced concurrently in another thread.
"""
result = self.torun.get()
while result is self.QUEUE_REPLACED_MARK:
result = self.torun.get()
return result

def sendevent(self, name, **kwargs):
self.log("sending", name, kwargs)
self.channel.send((name, kwargs))
Expand Down Expand Up @@ -136,19 +146,22 @@ def old_queue_get_nowait_noraise():
self.torun.put(i)

self.sendevent("unscheduled", indices=stolen)
old_queue.put(self.QUEUE_REPLACED_MARK)

@pytest.hookimpl
def pytest_runtestloop(self, session):
self.log("entering main loop")
self.channel.setcallback(self.handle_command, endmarker=self.SHUTDOWN_MARK)
self.nextitem_index = self.torun.get()
self.nextitem_index = self._get_next_item_index()
while self.nextitem_index is not self.SHUTDOWN_MARK:
self.run_one_test()
return True

def run_one_test(self):
self.item_index = self.nextitem_index
self.nextitem_index = self._get_next_item_index()

items = self.session.items
self.item_index, self.nextitem_index = self.nextitem_index, self.torun.get()
item = items[self.item_index]
if self.nextitem_index is self.SHUTDOWN_MARK:
nextitem = None
Expand Down
34 changes: 34 additions & 0 deletions testing/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,40 @@ def test_func4(): pass
ev = worker.popevent("workerfinished")
assert "workeroutput" in ev.kwargs

def test_steal_empty_queue(self, worker: WorkerSetup, unserialize_report) -> None:
worker.pytester.makepyfile(
"""
def test_func(): pass
def test_func2(): pass
"""
)
worker.setup()
ev = worker.popevent("collectionfinish")
ids = ev.kwargs["ids"]
assert len(ids) == 2
worker.sendcommand("runtests_all")

for when in ["setup", "call", "teardown"]:
ev = worker.popevent("testreport")
rep = unserialize_report(ev.kwargs["data"])
assert rep.nodeid.endswith("::test_func")
assert rep.when == when

worker.sendcommand("steal", indices=[0, 1])
ev = worker.popevent("unscheduled")
assert ev.kwargs["indices"] == []

worker.sendcommand("shutdown")

for when in ["setup", "call", "teardown"]:
ev = worker.popevent("testreport")
rep = unserialize_report(ev.kwargs["data"])
assert rep.nodeid.endswith("::test_func2")
assert rep.when == when

ev = worker.popevent("workerfinished")
assert "workeroutput" in ev.kwargs


def test_remote_env_vars(pytester: pytest.Pytester) -> None:
pytester.makepyfile(
Expand Down

0 comments on commit 6abcdfc

Please sign in to comment.