Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transition flight to missing if no who_has #5653

Merged
merged 7 commits into from
Feb 1, 2022

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Jan 12, 2022

There is an edge case connected to our select_keys_for_gather optimization for tasks which do no longer have a who_has. Letting the fetch->flight transition redirect this is the most straightforward solution to the problem.

A different approach would be to transition tasks to missing as soon as they no longer have a who_has. It is similar in complexity and I might change this again. I first want to write a test reproducing the mentioned edge case first.

#5381 (comment)

@crusaderky you may cherry-pick this commit in the meantime to test your branch. the changes will be rather minimal either way

distributed/worker.py Outdated Show resolved Hide resolved
crusaderky added a commit to crusaderky/distributed that referenced this pull request Jan 13, 2022
crusaderky added a commit to crusaderky/distributed that referenced this pull request Jan 13, 2022
crusaderky added a commit to crusaderky/distributed that referenced this pull request Jan 13, 2022
crusaderky added a commit to crusaderky/distributed that referenced this pull request Jan 21, 2022
@fjetter
Copy link
Member Author

fjetter commented Jan 25, 2022

This transition is only happening because there are duplicates in Worker.pending_data_per_worker. The pretty horrific test below can uncover this condition but I am a bit concerned about adding this to our test suite. At least for the time I am developing a fix this will be useful. The ultimate thing is a bug in Worker.update_who_has and we should be able to test this in isolation.

The entire mocking is there because we cannot hook into any worker internals naturally or intercept any stimuli, etc. I think if we were able to structure the internals a bit such that we can intercept/stop the worker at a few key points this would go a long way (e.g. the mocking is only there to disable the every_cycle callbackallowing me better control over / assert the state right after a handler )

@gen_cluster(client=True)
async def test_dups_in_pending_data_per_worker(c, s, a, b):

    # We need to fetch data which is reliably in the select_from_gather
    # if it is somehow prioriticed it will immediately be flagged as missing

    # pending_data_per_worker is outdated because removal is not easy
    futs = c.map(inc, range(100), workers=[a.address])
    missing_fut = c.submit(inc, -1, workers=[a.address], key="culprit")
    await c.gather(futs)
    await missing_fut
    patcher = mock.patch.object(Worker, "ensure_communicating", return_value=None)
    with mock.patch.object(
        Worker, "ensure_communicating", return_value=None
    ) as comm_mock:
        with mock.patch.object(
            Worker, "ensure_computing", return_value=None
        ) as comp_mock:
            x = await Worker(s.address, name=2, validate=True)
            f1 = c.submit(sum, [*futs[:50]], workers=[x.address], key="f1", priority=1)
            f2 = c.submit(inc, missing_fut, workers=[x.address], key="f2", priority=2)
            f3 = c.submit(sum, [*futs[50:]], workers=[x.address], key="f3", priority=3)

            while not len(x.data_needed) == len(futs) + 1:
                await asyncio.sleep(0.01)
            assert missing_fut.key in x.tasks
            assert missing_fut.key in x.pending_data_per_worker[a.address]

    for _ in range(3):
        await x.query_who_has(missing_fut.key, stimulus_id="foo")

    assert (
        sum(1 for key in x.pending_data_per_worker[a.address] if key == "culprit") > 1
    )
    with mock.patch.object(Worker, "query_who_has", return_value=None) as who_has_mock:
        # Now create a copy on B of culprit such that it isn't rescheduled once we
        # release it
        f_copy = c.submit(
            inc, missing_fut, key="copy-intention-culprit", workers=[b.address]
        )
        await f_copy
        a.handle_remove_replicas([missing_fut.key], stimulus_id="test")

        x.total_out_connections = 1
        x.target_message_size = sum(x.tasks[f.key].get_nbytes() for f in futs[:52])
        x.ensure_communicating()

        with mock.patch.object(
            Worker, "ensure_communicating", return_value=None
        ) as comm_mock:
            with mock.patch.object(
                Worker, "ensure_computing", return_value=None
            ) as comp_mock:
                while not x.data:
                    await asyncio.sleep(0.01)

        assert not x.tasks["culprit"].who_has
        assert x.tasks["culprit"].state == "fetch"
        x.target_message_size = 1000000000

        # Of course we do not want the assertionerror but it does get swallowed
        # if executed as part of a tornado coro and we will only see the timeout
        # below
        with pytest.raises(AssertionError):
            x.ensure_communicating()
        # await f1
        # await f2
        # await f3

@crusaderky
Copy link
Collaborator

@fjetter I'm not sure I understand:

  • in real life, the 3 calls to query_who_has are performed by gather_dep. Couldn't you invoke gather_dep instead?
  • Why is ensure_communicating invoking gather_dep on the same key multiple times?
  • In your test, what's the purpose of the 100 tasks that you seem to use as padding?
  • mock.patch alters all existing instances of a class. Couldn't you just init 3 workers from gen_cluster?

@fjetter fjetter force-pushed the transition_flight_missing branch from 7bed501 to d380604 Compare January 27, 2022 14:15
@fjetter
Copy link
Member Author

fjetter commented Jan 27, 2022

this thing escalated a bit out of control. There were multiple issues and my fixes are a little more involved than I was hoping. I do believe that the code is in a much better state now.
Notable changes:

  • I introduced the class _UniqueTaskHeap primarily for pending_data_per_worker. The uniqueness is not absolutely necessary and can be handled by introducing proper guards instead. However, uniqueness is a much safer guarantee and allows us to avoid a bunch of edge cases. The total runtime of this custom class is mostly the same as for the ordinary heap (with the exception of a constant offset). It requires a bit more memory but I consider this additional set to track uniqueness negligible
  • There is now one place and one place only which removes items from who_has. This is after an unsuccessful gather_dep. I believe this was the case before but now it is much more explicit. In particular, this means that there is only one -> missing transition and we do not need to add more ts.who_has guards all over the place
  • The transition flight->fetch lost information about who_has since it transitioned the task back to released. This is not good since that may be just introduced by a "busy" reply. Instead, we're now more explicit in this transition which should overall reduce requests on the scheduler.

dep_ts = worker.tasks[dep_key]
assert dep_ts.state == expected_state, (worker.name, dep_ts, expected_state)
assert set(expected) == set(worker.tasks)
async def assert_task_states_on_worker(expected, worker):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed these tests to be mildly flaky and allowed for a retry. Eventually the workers must reach an equilibrium or otherwise fail

distributed/tests/test_worker.py Outdated Show resolved Hide resolved
distributed/tests/test_worker.py Outdated Show resolved Hide resolved
Comment on lines 3150 to 3182
async def test_missing_released_zombie_tasks_2(c, s, a, b):
a.total_in_connections = 0
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[b.address])
# If get_data_from_worker raises this will suggest a dead worker to B and it
# will transition the task to missing. We want to make sure that a missing
# task is properly released and not left as a zombie
with mock.patch.object(
distributed.worker,
"get_data_from_worker",
side_effect=CommClosedError,
):
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[b.address])

while f1.key not in b.tasks:
await asyncio.sleep(0)
while f1.key not in b.tasks:
await asyncio.sleep(0)

ts = b.tasks[f1.key]
assert ts.state == "fetch"
ts = b.tasks[f1.key]
assert ts.state == "fetch"

# A few things can happen to clear who_has. The dominant process is upon
# connection failure to a worker. Regardless of how the set was cleared, the
# task will be transitioned to missing where the worker is trying to
# reaquire this information from the scheduler. While this is happening on
# worker side, the tasks are released and we want to ensure that no dangling
# zombie tasks are left on the worker
ts.who_has.clear()
while not ts.state == "missing":
# If we sleep for a longer time, the worker will spin into an
# endless loop of asking the scheduler who_has and trying to connect
# to A
await asyncio.sleep(0)

del f1, f2
del f1, f2

while b.tasks:
await asyncio.sleep(0.01)
while b.tasks:
await asyncio.sleep(0.01)

assert_worker_story(
b.story(ts),
[("f1", "missing", "released", "released", {"f1": "forgotten"})],
)
assert_worker_story(
b.story(ts),
[("f1", "missing", "released", "released", {"f1": "forgotten"})],
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test introduced an impossible transition. There is no way for a task in state fetch to empty who_has and the asserted story was false. I introduced the commclosederror above to simulate a more realistic examlpe

@fjetter fjetter self-assigned this Jan 27, 2022
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/tests/test_worker.py Outdated Show resolved Hide resolved
distributed/tests/test_worker.py Outdated Show resolved Hide resolved
distributed/tests/test_worker.py Outdated Show resolved Hide resolved
distributed/tests/test_worker.py Show resolved Hide resolved
distributed/tests/test_worker.py Outdated Show resolved Hide resolved
@crusaderky
Copy link
Collaborator

I think there is a minor problem: a fetched TaskState's priority is inherited from its dependent. So if you schedule a low-priority task with a dependency and, before the dependencies have been fetched, a high-priority task with the same dependency, the fetch priority won't be bumped up.

e.g.

f1 = c.submit(inc, 0, workers=[a.address])
f2 = c.submit(numpy.zeros, 2**28, workers=[a.address])  # 2 GiB
await wait([f1, f2])
f3 = c.submit(inc, f1, priority=-1, key="low", workers=[b.address])
f4 = c.submit(inc, f2, priority=0, key="mid", workers=[b.address])
f5 = c.submit(inc, f1, priority=1, key="high", workers=[b.address])

If I understand everything correctly, in the above example f5 may not run until after f2 has been transferred.

@fjetter
Copy link
Member Author

fjetter commented Jan 28, 2022

I think there is a minor problem: a fetched TaskState's priority is inherited from its dependent. So if you schedule a low-priority task with a dependency and, before the dependencies have been fetched, a high-priority task with the same dependency, the fetch priority won't be bumped up.

Indeed, this is an "unsolved" problem. I am aware of this and this has been a problem ever since I introduced the data_needed heap. My gut feeling was that this will only impact workflows rarely. I'm not entirely sure how valuable the reprioritization is and how costly the heap replace would be. I have a few ideas how to do this "efficiently" but I'm inclined to defer this to a later PR. I'm open to documenting this shortcoming somewhere, how about in handle_task_compute?

crusaderky added a commit to crusaderky/distributed that referenced this pull request Jan 28, 2022
commit 14bddba1482de035ca416368f918d94b115d3660
Merge: df079fc 9a66b71
Author: crusaderky <crusaderky@gmail.com>
Date:   Tue Jan 25 17:16:58 2022 +0000

    Merge branch 'main' into AMM/RetireWorker

commit df079fc
Merge: 9676934 5835ce3
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 24 15:08:51 2022 +0000

    Merge branch 'AMM/staging' into AMM/RetireWorker

commit 5835ce3
Merge: ee68b02 b0b8e95
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 24 15:07:27 2022 +0000

    Merge branch 'AMM/test_close_gracefully' into AMM/staging

commit ee68b02
Merge: 7d4e6ee ccad288
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 24 15:06:50 2022 +0000

    Merge branch 'main' into AMM/staging

commit b0b8e95
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 24 14:32:29 2022 +0000

    Code review

commit 9676934
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 21 14:50:53 2022 +0000

    AMM to manage retire_workers()

commit 7d4e6ee
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 21 14:39:41 2022 +0000

    Fix flaky test_close_gracefully and test_lifetime (dask#5677)

commit 7faab51
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 21 14:39:41 2022 +0000

    harden test

commit aef3b71
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 21 14:29:10 2022 +0000

    Increase resilience on slow CI

commit af84e40
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 21 12:39:18 2022 +0000

    Dump cluster state on all test failures (dask#5674)

commit 5054c19
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 21 12:38:38 2022 +0000

    Paused workers shouldn't steal tasks (dask#5665)

commit eadb35f
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 21 12:37:48 2022 +0000

    transition flight to missing if no who_has (dask#5653)

commit 581aee8
Merge: 940bb45 60c0d60
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 21 12:36:09 2022 +0000

    Merge branch 'main' into AMM/test_close_gracefully

commit 940bb45
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 21 12:20:10 2022 +0000

    tweak comment

commit 731d132
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 21 12:12:03 2022 +0000

    Fix flaky test_close_gracefully and test_lifetime
@crusaderky
Copy link
Collaborator

Indeed, this is an "unsolved" problem. I am aware of this and this has been a problem ever since I introduced the data_needed heap. My gut feeling was that this will only impact workflows rarely. I'm not entirely sure how valuable the reprioritization is and how costly the heap replace would be. I have a few ideas how to do this "efficiently" but I'm inclined to defer this to a later PR. I'm open to documenting this shortcoming somewhere, how about in handle_task_compute?

Yes I agree it's minor - it only impacts users who have a bottleneck in network comms. Happy to leave it to a later PR.

@fjetter fjetter merged commit b581bb6 into dask:main Feb 1, 2022
@fjetter fjetter deleted the transition_flight_missing branch February 1, 2022 13:50
gjoseph92 pushed a commit to gjoseph92/distributed that referenced this pull request Feb 1, 2022
Co-authored-by: crusaderky <crusaderky@gmail.com>
mrocklin added a commit to mrocklin/distributed that referenced this pull request Apr 12, 2022
Fixes dask#5951

In dask#5653 we removed the fetch
-> missing transition.  This caused deadlocks.  Now we add it back in.
mrocklin added a commit that referenced this pull request Apr 13, 2022
Fixes #5951

In #5653 we removed the fetch
-> missing transition.  This caused deadlocks.  Now we add it back in.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants