Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMM ReduceReplicas will cause increased failure rate of worker data fetching #6038

Closed
gjoseph92 opened this issue Mar 31, 2022 · 4 comments
Closed
Labels
discussion Discussing a topic with no specific actions yet enhancement Improve existing functionality or make things work better memory performance

Comments

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Mar 31, 2022

When the scheduler assigns a task to a worker, it sends it a list of other workers to fetch the dependencies of that task from. At a later point, the worker will actually pick one of those peers at random and ask it for the data.

Before AMM, so long as the peer worker wasn't dead, it would be extremely likely (guaranteed?) that the particular key was still there.

With the AMM ReduceReplicas policy active, it's quite possible—likely even—that the peers which originally had the data have since deleted their copy by the time the worker gets around to calling gather_dep. It's even possible that none of the peers on the original list have the data anymore (if the key ended up on a new worker in the interim, and ReduceReplicas happened to delete copies from all the old workers and leave the new worker with the only replica).

This is handled by sending a missing-data message to the scheduler (which doesn't do anything to get a more up-to-date list to the worker that needs it), then trying to fetch again from a different worker from the list the scheduler originally sent.

Only once every worker in the original list has been tried (and failed) will the Worker.find_missing callback kick in ask the scheduler who actually has the data needed.

So as long as find_missing works correctly, things should eventually sort themselves out. But it may make the data fetch process a lot slower than it needs to be.

For example, in one cluster dump I happened to look at, a worker's who_has list for a particular key had 82 entries. But on the scheduler, who_has for that key had only 1 worker, which was not even in the 82 (because ReduceReplicas had gotten rid of the all the copies in the time since). That means gather_dep would have to fail 82 times before round-tripping to the scheduler, then finally getting the data.

I don't think this is a correctness issue right now (again, assuming Worker.find_missing works reliably), but I wonder how much it degrades performance, especially for widely-used keys like this (where the original who_has list can be long).

cc @fjetter

@gjoseph92 gjoseph92 added enhancement Improve existing functionality or make things work better performance discussion Discussing a topic with no specific actions yet labels Mar 31, 2022
@fjetter
Copy link
Member

fjetter commented Apr 1, 2022

We might want to adjust the logic in

def update_who_has(self, who_has: dict[str, Collection[str]]) -> None:
try:
for dep, workers in who_has.items():
if not workers:
continue
if dep in self.tasks:
dep_ts = self.tasks[dep]
if self.address in workers and self.tasks[dep].state != "memory":
logger.debug(
"Scheduler claims worker %s holds data for task %s which is not true.",
self.name,
dep,
)
# Do not mutate the input dict. That's rude
workers = set(workers) - {self.address}
dep_ts.who_has.update(workers)
for worker in workers:
self.has_what[worker].add(dep)
self.pending_data_per_worker[worker].push(dep_ts)
except Exception as e: # pragma: no cover
logger.exception(e)
if LOG_PDB:
import pdb
pdb.set_trace()
raise
such that it removes tasks as well instead of just extending.

@gjoseph92
Copy link
Collaborator Author

Yes, and we'd also need to make the scheduler proactively call update_who_has on workers (maybe as a part of remove_replica?)

I've also opened #6056 to discuss the other side of this problem.

@crusaderky
Copy link
Collaborator

I just spotted this ticket 1 month late - please @ me next time!!

This is an issue that ReduceReplicas currently doesn't prevent, but mitigates:

# If a dependent task has not been assigned to a worker yet, err on the side
# of caution and preserve an additional replica for it.
# However, if two dependent tasks have been already assigned to the same
# worker, don't double count them.
nwaiters = len({waiter.processing_on or waiter for waiter in ts.waiters})
ndrop_key = len(ts.who_has) - max(desired_replicas, nwaiters)

Given the above, the AMM may cause an occasional failure in gather_deps, but you will never reach the point where you need to rely on find_missing - short of losing workers.

Enabling the AMM while you compute (a @ a.T).sum() - which falls neatly in the use case of many data transfers from many replicas - carries zero performance penalty in real life (tested on Coiled). Could you come up with a use case where the AMM can be observed slowing the computation down?

@crusaderky
Copy link
Collaborator

We might want to adjust the logic in update_who_has such that it removes tasks as well instead of just extending.

Done in #6342

As explained above, I don't believe the AMM actually increases failure rate by a significant amount.
I'm closing this issue for now. Feel free to reopen if you find evidence of the contrary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet enhancement Improve existing functionality or make things work better memory performance
Projects
None yet
Development

No branches or pull requests

3 participants