Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky test_worker_who_has_clears_after_failed_connection #6831

Closed
gjoseph92 opened this issue Aug 4, 2022 · 0 comments · Fixed by #6832
Closed

Flaky test_worker_who_has_clears_after_failed_connection #6831

gjoseph92 opened this issue Aug 4, 2022 · 0 comments · Fixed by #6832
Labels
flaky test Intermittent failures on CI.

Comments

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Aug 4, 2022

test_worker_who_has_clears_after_failed_connection failed in #6822, but I don't think it's related to the changes there. (Or if it is, the changes shouldn't have broken it.) I think it's indicating an actual state machine bug.

The test seems to be testing:

  1. Worker N has some data
  2. Worker A fetches all that data
  3. While the fetch is happening, worker N dies
  4. This does not cause a deadlock. Furthermore, worker A no longer thanks that worker N has any data

But the way the test is written, it's possible that the fetch (or some of it) manages to complete before worker N dies. It's just a race condition between an os._exit and the 0.1s delay on a SlowTransmitData.

When I looked at the cluster dump for this test, I noticed

  1. Work-stealing is kicking in, and actually the tasks intended for worker N are running on all workers (loose restricted tasks are allowed to be stolen)
  2. Worker A is successfully receiving data from worker N, even after the os._exit is kicked off

If some of the data is successfully fetched, then _handle_gather_dep_success won't be removing the keys from self.has_what and ts.who_has:

def _handle_gather_dep_success(self, ev: GatherDepSuccessEvent) -> RecsInstrs:
"""gather_dep terminated successfully.
The response may contain less keys than the request.
"""
recommendations: Recs = {}
for ts in self._gather_dep_done_common(ev):
if ts.key in ev.data:
recommendations[ts] = ("memory", ev.data[ts.key])
else:
self.log.append((ts.key, "missing-dep", ev.stimulus_id, time()))
if self.validate:
assert ts.state != "fetch"
assert ts not in self.data_needed[ev.worker]
ts.who_has.discard(ev.worker)
self.has_what[ev.worker].discard(ts.key)
recommendations[ts] = "fetch"

Notice that self.has_what[ev.worker].discard(ts.key) only happens in the else branch, when the key wasn't successfully received.

@crusaderky @fjetter what's the intent of this test, and how should we update it?

  1. Update test to guarantee the worker gets killed before it can transfer the data (use BlockedGetData instead of delay-based SlowTransmitData). Also disable stealing.
  2. Have the state machine remove tasks from self.has_what[ev.worker] (and ts.who_has?) regardless of whether they're successfully fetched. I don't think anything is doing this, and they'll just sit around in has_what forever right now? EDIT: _purge_state does, so this seems okay as-is.
______________ test_worker_who_has_clears_after_failed_connection ______________

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35425', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:39467', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:36259', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @pytest.mark.slow
    @gen_cluster(client=True)
    async def test_worker_who_has_clears_after_failed_connection(c, s, a, b):
        """This test is very sensitive to cluster state consistency. Timeouts often
        indicate subtle deadlocks. Be mindful when marking flaky/repeat/etc."""
        async with Nanny(s.address, nthreads=2) as n:
            while len(s.workers) < 3:
                await asyncio.sleep(0.01)
    
            def slow_ser(x, delay):
                return SlowTransmitData(x, delay=delay)
    
            n_worker_address = n.worker_address
            futures = c.map(
                slow_ser,
                range(20),
                delay=0.1,
                key=["f%d" % i for i in range(20)],
                workers=[n_worker_address],
                allow_other_workers=True,
            )
    
            def sink(*args):
                pass
    
            await wait(futures)
            result_fut = c.submit(sink, futures, workers=a.address)
    
            with suppress(CommClosedError):
                await c.run(os._exit, 1, workers=[n_worker_address])
    
            while len(s.workers) > 2:
                await asyncio.sleep(0.01)
    
            await result_fut
    
>           assert not a.state.has_what.get(n_worker_address)
E           AssertionError: assert not {'f0', 'f1', 'f10', 'f12', 'f17', 'f3', ...}
E            +  where {'f0', 'f1', 'f10', 'f12', 'f17', 'f3', ...} = <built-in method get of collections.defaultdict object at 0x7fa76f59a540>('tcp://127.0.0.1:46817')
E            +    where <built-in method get of collections.defaultdict object at 0x7fa76f59a540> = defaultdict(<class 'set'>, {'tcp://127.0.0.1:36259': {'f2', 'f19', 'f14', 'f16', 'f15'}, 'tcp://127.0.0.1:46817': {'f9', 'f10', 'f1', 'f3', 'f12', 'f0', 'f17', 'f8'}}).get
E            +      where defaultdict(<class 'set'>, {'tcp://127.0.0.1:36259': {'f2', 'f19', 'f14', 'f16', 'f15'}, 'tcp://127.0.0.1:46817': {'f9', 'f10', 'f1', 'f3', 'f12', 'f0', 'f17', 'f8'}}) = <distributed.worker_state_machine.WorkerState object at 0x7fa76f6ba040>.has_what
E            +        where <distributed.worker_state_machine.WorkerState object at 0x7fa76f6ba040> = <Worker 'tcp://127.0.0.1:39467', name: 0, status: running, stored: 21, running: 0/1, ready: 0, comm: 0, waiting: 0>.state

distributed/tests/test_failed_workers.py:363: AssertionError
----------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump/test_worker_who_has_clears_after_failed_connection.yaml
----------------------------- Captured stderr call -----------------------------
2022-08-04 07:17:12,858 - distributed.scheduler - INFO - State start
2022-08-04 07:17:12,860 - distributed.scheduler - INFO - Clear task state
2022-08-04 07:17:12,861 - distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:35425
2022-08-04 07:17:12,861 - distributed.scheduler - INFO -   dashboard at:           127.0.0.1:35609
2022-08-04 07:17:12,867 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:39467
2022-08-04 07:17:12,867 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:39467
2022-08-04 07:17:12,867 - distributed.worker - INFO -          dashboard at:            127.0.0.1:35707
2022-08-04 07:17:12,867 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:35425
2022-08-04 07:17:12,867 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,867 - distributed.worker - INFO -               Threads:                          1
2022-08-04 07:17:12,867 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-08-04 07:17:12,867 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-cx9zhjsd
2022-08-04 07:17:12,867 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,868 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:36259
2022-08-04 07:17:12,868 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:36259
2022-08-04 07:17:12,868 - distributed.worker - INFO -          dashboard at:            127.0.0.1:46347
2022-08-04 07:17:12,868 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:35425
2022-08-04 07:17:12,868 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,869 - distributed.worker - INFO -               Threads:                          2
2022-08-04 07:17:12,869 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-08-04 07:17:12,869 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-eaft19vh
2022-08-04 07:17:12,869 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,874 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39467', name: 0, status: init, memory: 0, processing: 0>
2022-08-04 07:17:12,875 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39467
2022-08-04 07:17:12,875 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:12,876 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:36259', name: 1, status: init, memory: 0, processing: 0>
2022-08-04 07:17:12,876 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:36259
2022-08-04 07:17:12,876 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:12,877 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:35425
2022-08-04 07:17:12,877 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,877 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:35425
2022-08-04 07:17:12,877 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,878 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:12,878 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:12,895 - distributed.scheduler - INFO - Receive client connection: Client-76148fba-13c5-11ed-8a5f-000d3a9fb637
2022-08-04 07:17:12,895 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:12,901 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:39065'
2022-08-04 07:17:13,828 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:46817
2022-08-04 07:17:13,828 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:46817
2022-08-04 07:17:13,828 - distributed.worker - INFO -          dashboard at:            127.0.0.1:46681
2022-08-04 07:17:13,828 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:35425
2022-08-04 07:17:13,828 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:13,828 - distributed.worker - INFO -               Threads:                          2
2022-08-04 07:17:13,828 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-08-04 07:17:13,828 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-18qsypko
2022-08-04 07:17:13,828 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:14,158 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:46817', status: init, memory: 0, processing: 0>
2022-08-04 07:17:14,160 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:46817
2022-08-04 07:17:14,160 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:14,160 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:35425
2022-08-04 07:17:14,160 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:14,162 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:15,294 - distributed.worker - INFO - Run out-of-band function '_exit'
2022-08-04 07:17:15,300 - distributed.scheduler - ERROR - broadcast to tcp://127.0.0.1:46817 failed: CommClosedError: in <TCP (closed) Scheduler Broadcast local=tcp://127.0.0.1:50650 remote=tcp://127.0.0.1:46817>: Stream is closed
2022-08-04 07:17:15,301 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:46817', status: running, memory: 8, processing: 0>
2022-08-04 07:17:15,302 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:46817
2022-08-04 07:17:15,302 - distributed.nanny - INFO - Worker process 8037 exited with status 1
2022-08-04 07:17:15,304 - distributed.nanny - WARNING - Restarting worker
2022-08-04 07:17:15,327 - distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:39065'.
2022-08-04 07:17:15,327 - distributed.nanny - INFO - Nanny asking worker to close
2022-08-04 07:17:16,262 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:33633
2022-08-04 07:17:16,262 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:33633
2022-08-04 07:17:16,262 - distributed.worker - INFO -          dashboard at:            127.0.0.1:45951
2022-08-04 07:17:16,262 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:35425
2022-08-04 07:17:16,262 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:16,262 - distributed.worker - INFO -               Threads:                          2
2022-08-04 07:17:16,262 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-08-04 07:17:16,262 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-f9t5upne
2022-08-04 07:17:16,263 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:16,263 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33633
2022-08-04 07:17:16,263 - distributed.worker - INFO - Closed worker has not yet started: Status.init
2022-08-04 07:17:16,344 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:56300 closed before handshake completed
2022-08-04 07:17:17,072 - distributed.scheduler - INFO - Remove client Client-76148fba-13c5-11ed-8a5f-000d3a9fb637
2022-08-04 07:17:17,074 - distributed.scheduler - INFO - Remove client Client-76148fba-13c5-11ed-8a5f-000d3a9fb637
2022-08-04 07:17:17,075 - distributed.scheduler - INFO - Close client connection: Client-76148fba-13c5-11ed-8a5f-000d3a9fb637
2022-08-04 07:17:17,083 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:39467
2022-08-04 07:17:17,083 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:36259
2022-08-04 07:17:17,085 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:39467', name: 0, status: closing, memory: 0, processing: 0>
2022-08-04 07:17:17,085 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:39467
2022-08-04 07:17:17,086 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:36259', name: 1, status: closing, memory: 0, processing: 0>
2022-08-04 07:17:17,086 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:36259
2022-08-04 07:17:17,086 - distributed.scheduler - INFO - Lost all workers
2022-08-04 07:17:17,086 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-ac1856b0-cbc6-4218-bc04-00a8cd84af4f Address tcp://127.0.0.1:39467 Status: Status.closing
2022-08-04 07:17:17,086 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-c6d051b9-5967-4ee7-b55b-ed33b235c74f Address tcp://127.0.0.1:36259 Status: Status.closing
2022-08-04 07:17:17,089 - distributed.scheduler - INFO - Scheduler closing...

https://github.com/dask/distributed/runs/7666946504?check_suite_focus=true#step:11:1316

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant