Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve recovery time in worker failure scenarios #3184

Open
fjetter opened this issue Oct 30, 2019 · 5 comments
Open

Improve recovery time in worker failure scenarios #3184

fjetter opened this issue Oct 30, 2019 · 5 comments

Comments

@fjetter
Copy link
Member

fjetter commented Oct 30, 2019

We are operating our distributed clusters in a cloud environment where we need to deal with frequently failing nodes. We usually dispatch jobs automatically and are bound to certain SLAs and therefore expect our jobs to finish in a more or less well defined time.
While distributed offers resilience in terms of graph recalculation we're facing the issue that the recalculation introduces severe performance issues for us.

We are looking for something which would allow us to recover faster in scenarios where individual workers die such that we do not need to recalculate large, expensive chunks of the graph, e.g. by persisting or replicating valuable, small intermediate results.

Ideally the solution would be handled by the scheduler itself, s.t. many different applications can benefit of it (e.g. via a scheduler plugin/extension). We were thinking about milestone/snapshotting where the user can label certain results to be worthy to be repliacated (and later forgotten once another milestone passes/completes). We also discussed some kind of automatic replication based on heuristics (e.g. bytes_result < x and runtime of task > Y -> replicate result) to soften the blow in case of failures.

My questions would be:

  1. Does anybody have additional ideas we should take into account?
  2. Could anybody already gather some experience with similar situations which might help us?
  3. If we would start to implement something like this, what would be the best approach / where should we start?
  4. Are we doing something fundamentally wrong? :)

Researching existing github issues, I only found #2748 which discusses this scenario briefly but is ultimately closed without a proper resolution to this topic. The only solution which is suggested is a caching library but persisting every single result is most likely not an option.

@mrocklin
Copy link
Member

Hi @fjetter ,

My apologies for the delay in response. You raised this when I was fairly busy, and I hadn't gone through old issues until now.

Everything you say makes a lot of sense to me. I think that Dask could really use an active memory manager that thinks about replication like how you describe. It's unlikely that I personally will build such a thing, but I think that you and your group could handle it easily.

Here are some links to prior discussion that might be helpful in informing design.

But neither is exactly what you are talking about. I think that ideally this would be a Scheduler extension that would keep a desired replication of every piece of data (informed either by heuristic, or explicit request from the user) and would send out requests to workers to duplicate or remove data as necessary to keep things on target. This sounds like an interesting problem to solve, and highly valuable for many applications.

Another approach would be to have workers retire themselves gracefully as they leave the cluster. My guess is that when your nodes die there is some way to have them give you a bit of warning. In these cases you could have them call self.close_gracefully() and move their data to other peers before leaving the network.

@fjetter
Copy link
Member Author

fjetter commented Nov 13, 2019

This is already quite helpful. The memory manager sounds quite promising but I will need to talk to my team first and will come back to you if we intend to pursue this further.

Another approach would be to have workers retire themselves gracefully

We were also discussing graceful downscaling but there are still some issues for us

  1. Ideally the solution should handle ungraceful downscaling since we can not necessarily give the guarantees that the worker gets enough time to handle its shutdown gracefully. We realise that this is a big ask, though.
  2. Mostly related to our setup but we are not quite sure how we would even trigger the graceful downscaling. We wouldn't want to rely on Client connections since we don't want to couple our cluster manager to any particular distributed version (Protocol/API stability, etc.).

Regarding 2. a thought just popped into my mind. Not sure if this is feasible, yet. Did you ever think about signal handlers for the worker/nanny?
Something like SIGTERM -> stop calculation immediately and retire gracefully if possible.

@mrocklin
Copy link
Member

mrocklin commented Nov 13, 2019 via email

@mrocklin
Copy link
Member

The memory manager sounds quite promising but I will need to talk to my team first and will come back to you if we intend to pursue this further.

I am also quite happy to engage here. I think that actve memory management is important for many workloads. I have some rough thoughts on how to design this, but I don't currently have enough continuous time to devote to it. I would be very happy to engage with regular meetings, review, and so on.

@StephanErb
Copy link
Contributor

StephanErb commented Nov 18, 2019

Another approach would be to have workers retire themselves gracefully as they leave the cluster. My guess is that when your nodes die there is some way to have them give you a bit of warning. In these cases you could have them call self.close_gracefully() and move their data to other peers before leaving the network.

For anyone coming here with a similar issue, we will now try the following: Teach our container scheduler to notify up-front if a worker is taken down. This does not help in cases of node failures but with regular maintenance and auto-scaling it should be effective.

We cannot solve that easily via SIGTERM as we also run reverse proxies / side-cars and need a coordinated shutdown. In our case having a simple HTTP method is the simplest solution for now. I am not sure if such a thing will be to general interest upstream, and we therefore take the following live for now as a tactical fix:

    import logging

    import click

    logger = logging.getLogger("distributed.custom")

    @click.command()
    @click.option("--graceful-shutdown-endpoint")
    def dask_setup(worker, graceful_shutdown_endpoint):
        """ Loaded at Dask worker startup via --preload """
        try:
            from distributed.dashboard.utils import RequestHandler
            from distributed.dashboard.worker import routes

            class TerminationHandler(RequestHandler):
                """
                Custom HTTP handler to trigger a graceful shutdown via Aurora's HTTP Lifecycle
                """

                def post(self):
                    logger.info(
                        "HTTP lifecycle triggered. Initiating graceful shutdown."
                    )

                    self.server.io_loop.add_callback(self.server.close_gracefully)

                    self.write("shutting down")
                    self.set_header("Content-Type", "text/plain")

            routes.append((graceful_shutdown_endpoint, TerminationHandler))

        except Exception:
            logger.exception(
                "Dask integration failed. Continuing without graceful worker termination"
            )
        else:
            logger.info(
                "Dask integration succeeded. Graceful worker termination activated"
            )

This module is then pre-loaded into the dask-worker process.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants