Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transition flight to missing if no who_has #5653

Merged
merged 7 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 113 additions & 50 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,14 @@
slowinc,
slowsum,
)
from distributed.worker import Worker, error_message, logger, parse_memory_limit
from distributed.worker import (
TaskState,
UniqueTaskHeap,
Worker,
error_message,
logger,
parse_memory_limit,
)

pytestmark = pytest.mark.ci1

Expand Down Expand Up @@ -1390,7 +1397,6 @@ async def test_prefer_gather_from_local_address(c, s, w1, w2, w3):
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 20,
timeout=30,
config={"distributed.worker.connections.incoming": 1},
)
async def test_avoid_oversubscription(c, s, *workers):
Expand Down Expand Up @@ -2186,12 +2192,26 @@ async def test_gpu_executor(c, s, w):
assert "gpu" not in w.executors


def assert_task_states_on_worker(expected, worker):
for dep_key, expected_state in expected.items():
assert dep_key in worker.tasks, (worker.name, dep_key, worker.tasks)
dep_ts = worker.tasks[dep_key]
assert dep_ts.state == expected_state, (worker.name, dep_ts, expected_state)
assert set(expected) == set(worker.tasks)
async def assert_task_states_on_worker(expected, worker):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed these tests to be mildly flaky and allowed for a retry. Eventually the workers must reach an equilibrium or otherwise fail

active_exc = None
for _ in range(10):
try:
for dep_key, expected_state in expected.items():
assert dep_key in worker.tasks, (worker.name, dep_key, worker.tasks)
dep_ts = worker.tasks[dep_key]
assert dep_ts.state == expected_state, (
worker.name,
dep_ts,
expected_state,
)
assert set(expected) == set(worker.tasks)
return
except AssertionError as exc:
active_exc = exc
await asyncio.sleep(0.1)
# If after a second the workers are not in equilibrium, they are broken
assert active_exc
raise active_exc


@gen_cluster(client=True)
Expand Down Expand Up @@ -2235,7 +2255,7 @@ def raise_exc(*args):
g.key: "memory",
res.key: "error",
}
assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, a)
# Expected states after we release references to the futures
f.release()
g.release()
Expand All @@ -2251,7 +2271,7 @@ def raise_exc(*args):
res.key: "error",
}

assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, a)

res.release()

Expand Down Expand Up @@ -2304,7 +2324,7 @@ def raise_exc(*args):
g.key: "memory",
res.key: "error",
}
assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, a)
# Expected states after we release references to the futures

res.release()
Expand All @@ -2318,7 +2338,7 @@ def raise_exc(*args):
g.key: "memory",
}

assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, a)

f.release()
g.release()
Expand Down Expand Up @@ -2369,7 +2389,7 @@ def raise_exc(*args):
g.key: "memory",
res.key: "error",
}
assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, a)
# Expected states after we release references to the futures

f.release()
Expand All @@ -2383,8 +2403,8 @@ def raise_exc(*args):
g.key: "memory",
}

assert_task_states_on_worker(expected_states, a)
assert_task_states_on_worker(expected_states, b)
await assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, b)

g.release()

Expand Down Expand Up @@ -2418,64 +2438,58 @@ def raise_exc(*args):
g.key: "memory",
h.key: "memory",
}
await asyncio.sleep(0.05)
assert_task_states_on_worker(expected_states_A, a)
await assert_task_states_on_worker(expected_states_A, a)

expected_states_B = {
f.key: "memory",
g.key: "memory",
h.key: "memory",
res.key: "error",
}
await asyncio.sleep(0.05)
assert_task_states_on_worker(expected_states_B, b)
await assert_task_states_on_worker(expected_states_B, b)

f.release()

expected_states_A = {
g.key: "memory",
h.key: "memory",
}
await asyncio.sleep(0.05)
assert_task_states_on_worker(expected_states_A, a)
await assert_task_states_on_worker(expected_states_A, a)

expected_states_B = {
f.key: "released",
g.key: "memory",
h.key: "memory",
res.key: "error",
}
await asyncio.sleep(0.05)
assert_task_states_on_worker(expected_states_B, b)
await assert_task_states_on_worker(expected_states_B, b)

g.release()

expected_states_A = {
g.key: "released",
h.key: "memory",
}
await asyncio.sleep(0.05)
assert_task_states_on_worker(expected_states_A, a)
await assert_task_states_on_worker(expected_states_A, a)

# B must not forget a task since all have a still valid dependent
expected_states_B = {
f.key: "released",
h.key: "memory",
res.key: "error",
}
assert_task_states_on_worker(expected_states_B, b)
await assert_task_states_on_worker(expected_states_B, b)
h.release()
await asyncio.sleep(0.05)

expected_states_A = {}
assert_task_states_on_worker(expected_states_A, a)
await assert_task_states_on_worker(expected_states_A, a)
expected_states_B = {
f.key: "released",
h.key: "released",
res.key: "error",
}

assert_task_states_on_worker(expected_states_B, b)
await assert_task_states_on_worker(expected_states_B, b)
res.release()

# We no longer hold any refs. Cluster should reset completely
Expand Down Expand Up @@ -3130,33 +3144,38 @@ async def test_missing_released_zombie_tasks(c, s, a, b):

@gen_cluster(client=True)
async def test_missing_released_zombie_tasks_2(c, s, a, b):
a.total_in_connections = 0
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[b.address])
# If get_data_from_worker raises this will suggest a dead worker to B and it
# will transition the task to missing. We want to make sure that a missing
# task is properly released and not left as a zombie
with mock.patch.object(
distributed.worker,
"get_data_from_worker",
side_effect=CommClosedError,
):
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[b.address])

while f1.key not in b.tasks:
await asyncio.sleep(0)
while f1.key not in b.tasks:
await asyncio.sleep(0)

ts = b.tasks[f1.key]
assert ts.state == "fetch"
ts = b.tasks[f1.key]
assert ts.state == "fetch"

# A few things can happen to clear who_has. The dominant process is upon
# connection failure to a worker. Regardless of how the set was cleared, the
# task will be transitioned to missing where the worker is trying to
# reaquire this information from the scheduler. While this is happening on
# worker side, the tasks are released and we want to ensure that no dangling
# zombie tasks are left on the worker
ts.who_has.clear()
while ts.state != "missing":
# If we sleep for a longer time, the worker will spin into an
# endless loop of asking the scheduler who_has and trying to connect
# to A
await asyncio.sleep(0)

del f1, f2
del f1, f2

while b.tasks:
await asyncio.sleep(0.01)
while b.tasks:
await asyncio.sleep(0.01)

assert_worker_story(
b.story(ts),
[("f1", "missing", "released", "released", {"f1": "forgotten"})],
)
assert_worker_story(
b.story(ts),
[("f1", "missing", "released", "released", {"f1": "forgotten"})],
)


@pytest.mark.slow
Expand Down Expand Up @@ -3441,6 +3460,8 @@ async def test_Worker__to_dict(c, s, a):
"config",
"incoming_transfer_log",
"outgoing_transfer_log",
"data_needed",
"pending_data_per_worker",
}
assert d["tasks"]["x"]["key"] == "x"

Expand All @@ -3462,3 +3483,45 @@ async def test_TaskState__to_dict(c, s, a):
assert isinstance(tasks["z"], dict)
assert tasks["x"]["dependents"] == ["<TaskState 'y' memory>"]
assert tasks["y"]["dependencies"] == ["<TaskState 'x' memory>"]


def test_unique_task_heap():
heap = UniqueTaskHeap()

for x in range(10):
ts = TaskState(f"f{x}")
ts.priority = (0, 0, 1, x % 3)
heap.push(ts)

heap_list = list(heap)
# iteration does not empty heap
assert len(heap) == 10
assert heap_list == sorted(heap_list, key=lambda ts: ts.priority)

seen = set()
last_prio = (0, 0, 0, 0)
while heap:
peeked = heap.peek()
ts = heap.pop()
assert peeked == ts
seen.add(ts.key)
assert ts.priority
assert last_prio <= ts.priority
last_prio = last_prio

ts = TaskState("foo")
heap.push(ts)
heap.push(ts)
assert len(heap) == 1

assert repr(heap) == "<UniqueTaskHeap: 1 items>"

assert heap.pop() == ts
assert not heap
fjetter marked this conversation as resolved.
Show resolved Hide resolved

# Test that we're cleaning the seen set on pop
heap.push(ts)
assert len(heap) == 1
assert heap.pop() == ts

assert repr(heap) == "<UniqueTaskHeap: 0 items>"
Loading