Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster manager info to scheduler #6489

Open
mrocklin opened this issue Jun 1, 2022 · 7 comments
Open

Add cluster manager info to scheduler #6489

mrocklin opened this issue Jun 1, 2022 · 7 comments

Comments

@mrocklin
Copy link
Member

mrocklin commented Jun 1, 2022

I was just looking at Cluster._cluster_info. It looks like it sends this up to the scheduler as a metadata value. Should we make this proper scheduler state instead?

cc @jacobtomlinson

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Jun 14, 2022

See #4607, #5031 and #5033 for history on this requirement.

The goal here is for cluster objects to be able to use the scheduler as state storage so that multiple instances of the same cluster manager class can represent one existing cluster with the state staying in sync. The cluster objects need to be able to pass their state to the scheduler and retrieve it again in the future.

I have no strong feelings about how that should be implemented or whether the current implementation is optimal. Only that it is desirable for downstream cluster managers, especially dask-cloudprovider although I haven't actually managed to find the time to leverage this feature there yet.

Pinging @fjetter and @graingert who seem to also be making changes to this functionality lately.

@graingert
Copy link
Member

Pinging @fjetter and @graingert who seem to also be making changes to this functionality lately.

one PR for that is here #6487

@graingert
Copy link
Member

graingert commented Jun 14, 2022

from your usecase here, I'm not sure why a periodically synchronized dictionary would be more useful than the existing scheduler_comm.set/get_metadata methods:

This is a good place to persist state at various points in the cluster lifecycle. Particularly when wanting to disconnect/reconnect from a running cluster. In cluster managers like KubeCluster this would be a good place to store all the config for the cluster. Things like worker CPU/memory settings for creating new worker pods. This will then be loaded back in when using KubeCluster.from_name() instead of having to try and serialise everything into some platform-specific metadata store as I attempted in dask/dask-kubernetes#318.

https://github.com/dask/dask-kubernetes/pull/318/files#diff-9c83258f58c7a7c5b9a0b3e1f1b3ce321bf63eaa614762f18c4150dbdea07455R641

a periodically synchronized dictionary introduces problems where a consumer does not know when or if data added to the dictionary has been persisted or when or if data has been retrieved.

In addition it seems that from your use-case using self._cluster_info would be too late because you need to call await super()._start() to access the updated self._cluster_info. When hydrating a cluster you'd always need to discover a scheduler address and then call await scheduler_comm.get_metadata(...) rather than self._cluster_info anyway.

I think it would be better to access the metadata immediately before calling a method that needs it, eg:

async def _scale(self, ...):
    spec = await scheduler_comm.get_metadata(key="dask-kubernetes-scale-info")

@graingert
Copy link
Member

would it make sense to move the implementation to a custom dask-cloudprovider Cluster subclass until the design solidifies and we can adopt it in dask-distributed?

class CloudInfoCluster(Cluster):
    def __init__(self, *args, cloud_info_sync_interval=1, **kwargs):
        self._cloud_info_sync_interval = cloud_info_sync_interval
        self._cloud_info = {}
        super().__init__(self, *args, **kwargs)
        
    async def _start(self):
        await super()._start()
        self._sync_cloud_info_task = asyncio.create_task(self._sync_cloud_info())
        
    async def _sync_cloud_info(self):
        err_count = 0
        warn_at = 5
        max_interval = 10 * self._cloud_info_sync_interval
        # Loop until the cluster is shutting down. We shouldn't really need
        # this check (the `CancelledError` should be enough), but something
        # deep in the comms code is silencing `CancelledError`s _some_ of the
        # time, resulting in a cancellation not always bubbling back up to
        # here. Relying on the status is fine though, not worth changing.
        while self.status == Status.running:
            try:
                await self.scheduler_comm.set_metadata(
                    keys=["cloud-info"],
                    value=self._cloud_info.copy(),
                )
                err_count = 0
            except asyncio.CancelledError:
                # Task is being closed. When we drop Python < 3.8 we can drop
                # this check (since CancelledError is not a subclass of
                # Exception then).
                break
            except Exception:
                err_count += 1
                # Only warn if multiple subsequent attempts fail, and only once
                # per set of subsequent failed attempts. This way we're not
                # excessively noisy during a connection blip, but we also don't
                # silently fail.
                if err_count == warn_at:
                    logger.warning(
                        "Failed to sync cluster info multiple times - perhaps "
                        "there's a connection issue? Error:",
                        exc_info=True,
                    )
            # Sleep, with error backoff
            interval = min(max_interval, self._sync_interval * 1.5**err_count)
            await asyncio.sleep(interval)

        async def _close(self):
            try:
                if self._sync_cloud_info_task:
                    self._sync_cloud_info_task.cancel()
                    with suppress(asyncio.CancelledError):
                        await self._sync_cloud_info_task
             finaly:
                 return await super()._close()

@jacobtomlinson
Copy link
Member

As I said I'm not wedded to any one implementation, we can definitely change what exists today. I'm a little confused why this has come up nearly a year after this feature was added to distributed. Is this functionality causing some issues at Coiled that are not being communicated here? This conversation plus the other issues I linked are not feeling especially transparent in terms of problems and motivations.

I can't seem to find it now but there was discussion previously around why we should use something separate to set/get_metadata. I think the sync was the reason why we did something separately.

Surely we can find a solution to #6485 without having to back out this whole feature though...

I would prefer not to push this functionality down into one specific project, I have todo items to implement this in both dask-cloudprovider and dask-jobqueue and would prefer to avoid the duplication. As I see it this feature is already adopted in distributed and has been since last summer, it's just a little buggy and needs fixing up.

@fjetter
Copy link
Member

fjetter commented Jun 14, 2022

I'm a little confused why this has come up nearly a year after this feature was added to distributed.
This conversation plus the other issues I linked are not feeling especially transparent in terms of problems and motivations.

There is nothing happening here that we are not communicating. While investigating various CI failures, we encountered #6485 which sets global state that interferes with other tests because of a mutable default value. From the documentation and tests in the dask/distributed repo we were having issues differentiating between intended functionality and implementation details (e.g. #6487 (comment))

No Coiled intentions hidden here. Dask maintenance only, so far.

I think the question why no issues have come up after a year is simply because this hasn't been used a lot. At least you indicated that this has not been used, yet #6487 (comment)

I can't seem to find it now but there was discussion previously around why we should use something separate to set/get_metadata. I think the sync was the reason why we did something separately.

I suspect this fell out of #4263 ? I haven't read through the entire thing and haven't followed any development around https://github.com/dask-contrib/dask-ctl but I could see dynamics becoming important if one wants to implement lifecycle management. However, in this case the consistency problems Tom mentioned might also be of interest and we may want to sync the dict more selectively and implement error handling.
You also indicated that the name setter might be important #6487 (comment) and it looks like you're interested in a bidirectional sync, i.e. if somewhere some cluster sets a name attribute, this propagates to all connected clusters? (I think I'm struggling with the concept of having multiple cluster objects at the same time)

Either way, I'm not opposed to this feature but would appreciate a bit more transparency about what it is intended to do and why. This is not clear from documentation, comments and/or tests in dask/distributed. It's apparently very difficult to even motivate it going back through our issues.


coming back to @mrocklin s question

Should we make this proper scheduler state instead?

Do you suggest to have the cluster_info state variable as scheduler state or anything specific in there? If it's just another generic dictionary we're setting on the scheduler, I think I would prefer keeping it as metadata right now.

Can you motivate what you would like to use this for?

@jacobtomlinson
Copy link
Member

No Coiled intentions hidden here. Dask maintenance only, so far.

Fair enough, it just seems like a small bug has blown up into deep discussions around this topic.

You're right that this is all motivated by #4263. That issue is basically design proposal like the ones we have been discussing in dask/community#230. All of the motivations around this are covered in that issue. I'd love for more folks to engage on this topic so if you have the time to read it I would appreciate your input.

The goal is less about having multiple instances of a class simultaneously and more about being able to delete and recreate them. The simplest example is restarting a notebook kernel without blowing up a whole cluster or losing the convenience of having a cluster manager object to shut down the cluster later.

Simultaneous instances can be helpful too, especially in multi-stage pipelines. Some steps may run simultaneously and each require a cluster object. Within dask-ctl we also hydrate ephemeral cluster objects as part of the listing, scaling and closing operations, so additional instances exist briefly.

So yeah coming back to the original question I really don't have strong feelings about how this is implemented. As long as cluster objects can use the scheduler as a state store (ideally with automatic syncing) I don't mind how it is implemented.

I think I would prefer keeping it as metadata right now

By this statement do you mean you would prefer to revert the cluster info feature?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants