-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add back Worker.transition_fetch_missing #6112
Changes from 5 commits
94111c5
1ad08db
f1df172
f1d0ab0
b96699a
dfc5a4c
03e7df9
09b0765
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3389,3 +3389,24 @@ async def test_tick_interval(c, s, a, b): | |
while s.workers[a.address].metrics["event_loop_interval"] < 0.100: | ||
await asyncio.sleep(0.01) | ||
time.sleep(0.200) | ||
|
||
|
||
class BreakingWorker(Worker): | ||
broke_once = False | ||
|
||
def get_data(self, comm, **kwargs): | ||
if not self.broke_once: | ||
self.broke_once = True | ||
raise OSError("fake error") | ||
return super().get_data(comm, **kwargs) | ||
|
||
|
||
@pytest.mark.slow | ||
@gen_cluster(client=True, Worker=BreakingWorker) | ||
async def test_broken_comm(c, s, a, b): | ||
df = dask.datasets.timeseries( | ||
start="2000-01-01", | ||
end="2000-01-10", | ||
) | ||
s = df.shuffle("id", shuffle="tasks") | ||
await c.compute(s.size) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kudos to @gjoseph92 and @nils-braun for the test |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2671,6 +2671,9 @@ def ensure_communicating(self) -> None: | |
if ts.state != "fetch": | ||
continue | ||
|
||
if self.validate: | ||
assert ts.who_has | ||
|
||
workers = [w for w in ts.who_has if w not in self.in_flight_workers] | ||
if not workers: | ||
assert ts.priority is not None | ||
|
@@ -2999,7 +3002,13 @@ async def gather_dep( | |
for d in has_what: | ||
ts = self.tasks[d] | ||
ts.who_has.remove(worker) | ||
|
||
if not ts.who_has: | ||
recommendations[ts] = "missing" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logging here might be helpful for future debugging. Probably shouldn't call it It also might be worth a comment on why we don't send There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added logging. I haven't added the comment. I felt that I didn't understand the reasoning here sufficiently well to argue for one way or the other. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are two things were this code path becomes relevant
The scheduler will detect the dead worker eventually and reschedule the task. By not sending the
We currently cannot distinguish 1. and 2. so we need to find a middle ground. Purging data is safe because we can reacquire this information. Sending There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This explanation is basically what I was looking for in the comment. |
||
logger.info( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, please not |
||
"Lost worker connection to %s caused task %s to go missing", | ||
worker, | ||
ts.key, | ||
) | ||
except Exception as e: | ||
logger.exception(e) | ||
if self.batched_stream and LOG_PDB: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am somewhat ok with this test, since it does reliably trigger the behavior. But I think @fjetter was hoping to see a more minimized case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that desire. I encourage folks to work on that. I think that this suffices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not reliably trigger the condition for me. I do hit it but it is not deterministic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can increase the data volume and it will become more and more likely. I don't have a deterministic test. I think that it would be good to have. I think that this suffices though.