Skip to content

Commit

Permalink
transition flight to missing if no who_has (#5653)
Browse files Browse the repository at this point in the history
Co-authored-by: crusaderky <crusaderky@gmail.com>
  • Loading branch information
fjetter and crusaderky authored Feb 1, 2022
1 parent 5386d76 commit b581bb6
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 122 deletions.
163 changes: 113 additions & 50 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,14 @@
slowinc,
slowsum,
)
from distributed.worker import Worker, error_message, logger, parse_memory_limit
from distributed.worker import (
TaskState,
UniqueTaskHeap,
Worker,
error_message,
logger,
parse_memory_limit,
)

pytestmark = pytest.mark.ci1

Expand Down Expand Up @@ -1390,7 +1397,6 @@ async def test_prefer_gather_from_local_address(c, s, w1, w2, w3):
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 20,
timeout=30,
config={"distributed.worker.connections.incoming": 1},
)
async def test_avoid_oversubscription(c, s, *workers):
Expand Down Expand Up @@ -2186,12 +2192,26 @@ async def test_gpu_executor(c, s, w):
assert "gpu" not in w.executors


def assert_task_states_on_worker(expected, worker):
for dep_key, expected_state in expected.items():
assert dep_key in worker.tasks, (worker.name, dep_key, worker.tasks)
dep_ts = worker.tasks[dep_key]
assert dep_ts.state == expected_state, (worker.name, dep_ts, expected_state)
assert set(expected) == set(worker.tasks)
async def assert_task_states_on_worker(expected, worker):
active_exc = None
for _ in range(10):
try:
for dep_key, expected_state in expected.items():
assert dep_key in worker.tasks, (worker.name, dep_key, worker.tasks)
dep_ts = worker.tasks[dep_key]
assert dep_ts.state == expected_state, (
worker.name,
dep_ts,
expected_state,
)
assert set(expected) == set(worker.tasks)
return
except AssertionError as exc:
active_exc = exc
await asyncio.sleep(0.1)
# If after a second the workers are not in equilibrium, they are broken
assert active_exc
raise active_exc


@gen_cluster(client=True)
Expand Down Expand Up @@ -2235,7 +2255,7 @@ def raise_exc(*args):
g.key: "memory",
res.key: "error",
}
assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, a)
# Expected states after we release references to the futures
f.release()
g.release()
Expand All @@ -2251,7 +2271,7 @@ def raise_exc(*args):
res.key: "error",
}

assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, a)

res.release()

Expand Down Expand Up @@ -2304,7 +2324,7 @@ def raise_exc(*args):
g.key: "memory",
res.key: "error",
}
assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, a)
# Expected states after we release references to the futures

res.release()
Expand All @@ -2318,7 +2338,7 @@ def raise_exc(*args):
g.key: "memory",
}

assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, a)

f.release()
g.release()
Expand Down Expand Up @@ -2369,7 +2389,7 @@ def raise_exc(*args):
g.key: "memory",
res.key: "error",
}
assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, a)
# Expected states after we release references to the futures

f.release()
Expand All @@ -2383,8 +2403,8 @@ def raise_exc(*args):
g.key: "memory",
}

assert_task_states_on_worker(expected_states, a)
assert_task_states_on_worker(expected_states, b)
await assert_task_states_on_worker(expected_states, a)
await assert_task_states_on_worker(expected_states, b)

g.release()

Expand Down Expand Up @@ -2418,64 +2438,58 @@ def raise_exc(*args):
g.key: "memory",
h.key: "memory",
}
await asyncio.sleep(0.05)
assert_task_states_on_worker(expected_states_A, a)
await assert_task_states_on_worker(expected_states_A, a)

expected_states_B = {
f.key: "memory",
g.key: "memory",
h.key: "memory",
res.key: "error",
}
await asyncio.sleep(0.05)
assert_task_states_on_worker(expected_states_B, b)
await assert_task_states_on_worker(expected_states_B, b)

f.release()

expected_states_A = {
g.key: "memory",
h.key: "memory",
}
await asyncio.sleep(0.05)
assert_task_states_on_worker(expected_states_A, a)
await assert_task_states_on_worker(expected_states_A, a)

expected_states_B = {
f.key: "released",
g.key: "memory",
h.key: "memory",
res.key: "error",
}
await asyncio.sleep(0.05)
assert_task_states_on_worker(expected_states_B, b)
await assert_task_states_on_worker(expected_states_B, b)

g.release()

expected_states_A = {
g.key: "released",
h.key: "memory",
}
await asyncio.sleep(0.05)
assert_task_states_on_worker(expected_states_A, a)
await assert_task_states_on_worker(expected_states_A, a)

# B must not forget a task since all have a still valid dependent
expected_states_B = {
f.key: "released",
h.key: "memory",
res.key: "error",
}
assert_task_states_on_worker(expected_states_B, b)
await assert_task_states_on_worker(expected_states_B, b)
h.release()
await asyncio.sleep(0.05)

expected_states_A = {}
assert_task_states_on_worker(expected_states_A, a)
await assert_task_states_on_worker(expected_states_A, a)
expected_states_B = {
f.key: "released",
h.key: "released",
res.key: "error",
}

assert_task_states_on_worker(expected_states_B, b)
await assert_task_states_on_worker(expected_states_B, b)
res.release()

# We no longer hold any refs. Cluster should reset completely
Expand Down Expand Up @@ -3130,33 +3144,38 @@ async def test_missing_released_zombie_tasks(c, s, a, b):

@gen_cluster(client=True)
async def test_missing_released_zombie_tasks_2(c, s, a, b):
a.total_in_connections = 0
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[b.address])
# If get_data_from_worker raises this will suggest a dead worker to B and it
# will transition the task to missing. We want to make sure that a missing
# task is properly released and not left as a zombie
with mock.patch.object(
distributed.worker,
"get_data_from_worker",
side_effect=CommClosedError,
):
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[b.address])

while f1.key not in b.tasks:
await asyncio.sleep(0)
while f1.key not in b.tasks:
await asyncio.sleep(0)

ts = b.tasks[f1.key]
assert ts.state == "fetch"
ts = b.tasks[f1.key]
assert ts.state == "fetch"

# A few things can happen to clear who_has. The dominant process is upon
# connection failure to a worker. Regardless of how the set was cleared, the
# task will be transitioned to missing where the worker is trying to
# reaquire this information from the scheduler. While this is happening on
# worker side, the tasks are released and we want to ensure that no dangling
# zombie tasks are left on the worker
ts.who_has.clear()
while ts.state != "missing":
# If we sleep for a longer time, the worker will spin into an
# endless loop of asking the scheduler who_has and trying to connect
# to A
await asyncio.sleep(0)

del f1, f2
del f1, f2

while b.tasks:
await asyncio.sleep(0.01)
while b.tasks:
await asyncio.sleep(0.01)

assert_worker_story(
b.story(ts),
[("f1", "missing", "released", "released", {"f1": "forgotten"})],
)
assert_worker_story(
b.story(ts),
[("f1", "missing", "released", "released", {"f1": "forgotten"})],
)


@pytest.mark.slow
Expand Down Expand Up @@ -3441,6 +3460,8 @@ async def test_Worker__to_dict(c, s, a):
"config",
"incoming_transfer_log",
"outgoing_transfer_log",
"data_needed",
"pending_data_per_worker",
}
assert d["tasks"]["x"]["key"] == "x"

Expand All @@ -3462,3 +3483,45 @@ async def test_TaskState__to_dict(c, s, a):
assert isinstance(tasks["z"], dict)
assert tasks["x"]["dependents"] == ["<TaskState 'y' memory>"]
assert tasks["y"]["dependencies"] == ["<TaskState 'x' memory>"]


def test_unique_task_heap():
heap = UniqueTaskHeap()

for x in range(10):
ts = TaskState(f"f{x}")
ts.priority = (0, 0, 1, x % 3)
heap.push(ts)

heap_list = list(heap)
# iteration does not empty heap
assert len(heap) == 10
assert heap_list == sorted(heap_list, key=lambda ts: ts.priority)

seen = set()
last_prio = (0, 0, 0, 0)
while heap:
peeked = heap.peek()
ts = heap.pop()
assert peeked == ts
seen.add(ts.key)
assert ts.priority
assert last_prio <= ts.priority
last_prio = last_prio

ts = TaskState("foo")
heap.push(ts)
heap.push(ts)
assert len(heap) == 1

assert repr(heap) == "<UniqueTaskHeap: 1 items>"

assert heap.pop() == ts
assert not heap

# Test that we're cleaning the seen set on pop
heap.push(ts)
assert len(heap) == 1
assert heap.pop() == ts

assert repr(heap) == "<UniqueTaskHeap: 0 items>"
Loading

0 comments on commit b581bb6

Please sign in to comment.