Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Support multiple worker types DO NOT MERGE #179

Closed
wants to merge 3 commits into from

Conversation

joeschmid
Copy link

@joeschmid joeschmid commented Sep 30, 2019

This is currently just a prototype to try out supporting multiple worker types, each with different pod specs. The approach is roughly:

• Leave the existing worker spec as a special "default" worker type
• Adaptive mode would only use the default worker type
• cluster.scale(n) would use the default worker type
• Provide the ability to add any number of worker types, stored as dict entries of str -> pod spec
• Provide the ability to call cluster.scale(n, worker_type='gpu') to manually scale specify worker types

@jacobtomlinson
Copy link
Member

This is definitely an interesting use case. I would like to know more about how you are using dask-kubernetes.

It sounds like you are creating a large long living Dask cluster with a variety of worker types in order to back your Prefect pipeline.

This libraries primary focus is on short lived ad-hoc clusters.

Dask Kubernetes deploys Dask workers on Kubernetes clusters using native Kubernetes APIs. It is designed to dynamically launch short-lived deployments of workers during the lifetime of a Python process.

I wonder if this functionality would also be suited for the Dask helm chart which targets long lived clusters.

@mrocklin
Copy link
Member

mrocklin commented Oct 1, 2019

@jacobtomlinson are long-lived clusters in-scope, or would we want to use entirely different Kubernetes primitives than we're using here today.

@jacobtomlinson
Copy link
Member

My preference would be that long-lived clusters should use resources with better lifecycle support. For example, the scheduler should really be a deployment so that it would be restarted if it failed.

This library also assumes that the cluster should be garbage collected when the Python session is closed or the cluster object is deleted. This may not be true for long-lived clusters.

I'm always keen to see if we could align the two projects more, but they do seem to be tageting different use cases.

@mrocklin
Copy link
Member

mrocklin commented Oct 1, 2019

I'm always keen to see if we could align the two projects more, but they do seem to be tageting different use cases.

Agreed. I'm mostly thinking about longer-term work. @jcrist seems to have done this with Dask-Yarn, for example. It might be good to have him write up what semantics might be for optional garbage collection with the Python processes.

Both the scheduler-deployment and pythhon-GC topics seem like they might be optional in the future?

@jacobtomlinson
Copy link
Member

Yeah that sounds sensible.

@joeschmid
Copy link
Author

joeschmid commented Oct 1, 2019

Thanks @jacobtomlinson and @mrocklin for the thoughts! A couple quick comments FWIW:

  • @jacobtomlinson you're exactly right that we currently use dask-kubernetes (KubeCluster) to create a long-running Dask cluster. We do that from a k8s Deployment that we've created on our own. On start it runs a .py file that: (1) reads a config file for whether to run in adaptive mode plus worker counts, i.e. min/max for adaptive, count for non-adaptive (2) starts a KubeCluster (3) periodically checks for changes to a config file (4) if any config file changes are detected call cluster.adapt() or cluster.scale() as appropriate. (Code snippet below -- maybe this isn't a great use of KubeCluster or at least not how it was intended?)
  • It may be possible that we could move to a model where at the start of each Prefect Flow run we create a new, dynamic Dask cluster and specify the types of workers (and maybe counts if it were non-adaptive) that we want for that Flow

As long as we have a way to specify different worker types (and potentially the counts, too), that should meet our need. I'm not sure how adaptive mode would work with different worker types, but one option might be a hybrid, i.e. adaptive mode scales as normal using a single default worker type while (at the same time) manually specifying the count of specific worker types, e.g. give me an adaptive cluster using this default worker spec plus 10 of this GPU worker spec and 5 of this high memory worker spec, etc.

I'm relatively new to Dask so very open to thoughts and feedback of how to approach this.

cluster = KubeCluster.from_yaml(SRM_CONF_DIR + 'dask-worker-pod-spec.yaml',
                                memory_request=mem_request, memory_limit=mem_limit, 
                                cpu_request=cpu_request, cpu_limit=cpu_limit, port=8786, host='0.0.0.0')

def get_config():
    config = configparser.ConfigParser()
    config.read(SRM_CONF_DIR + 'srm-dask-scheduler.conf')
    return config

in_adaptive_mode = False


def update_scheduler(config):
    global in_adaptive_mode
    if config['DEFAULT']['AdaptiveMode'] == 'True':
        min = int(config['Adaptive']['MinWorkers'])
        max = int(config['Adaptive']['MaxWorkers'])
        logger.info('Calling cluster.adapt(minimum={}, maximum={})'.format(min, max))
        cluster.adapt(minimum=min, maximum=max)  # scale based on current workload
        in_adaptive_mode = True
    else:
        n = int(config['NonAdaptive']['Workers'])
        logger.info('Calling cluster.scale(n={})'.format(n))
        if in_adaptive_mode:
            logger.info('Stopping adaptive cluster with cluster._adaptive.stop()')
            cluster._adaptive.stop()
        cluster.scale(n)
        in_adaptive_mode = False

        
DEFAULT_SLEEP_TIME_IN_SECONDS = 15

try:
    current_config = None
    new_config = get_config()
    
    while True:
        if new_config != current_config:
            logger.info('Detected config file change -- calling update_scheduler()')
            update_scheduler(new_config)
            current_config = new_config
        sleep_time = int(current_config['DEFAULT'].get('SleepTimeInSeconds', DEFAULT_SLEEP_TIME_IN_SECONDS))
        time.sleep(sleep_time)
        new_config = get_config()

except Exception as e:
    logger.exception(e)
                         
finally:
    logger.info('Hit finally: block -- scaling to 0 and stopping cluster')
    cluster.scale(0)
    cluster.scheduler.stop()
    logger.info("End scheduler at %r", cluster.scheduler.address)

@jacobtomlinson
Copy link
Member

maybe this isn't a great use of KubeCluster or at least not how it was intended?

It's not how it was intended (at least from me), but I'm glad it is useful to you!

It may be possible that we could move to a model where at the start of each Prefect Flow run we create a new, dynamic Dask cluster and specify the types of workers (and maybe counts if it were non-adaptive) that we want for that Flow

That would align more with this library. However I'm not suggesting that your workflow is wrong, rather that perhaps there are better tools out there.

I would imagine a version of the Dask helm chart may give you more stability for longer running clusters. It uses Deployment resources which have built in lifecycle policies at the Kubernetes level. You would also manage scaling in Kubernetes too, which means no adaptivity.

@joeschmid
Copy link
Author

Thanks @jacobtomlinson. We started with the Dask helm chart (and that was a good way to begin), but we wanted to make a bunch of customizations (run multiple instances of JupyterLab on different AWS instance sizes, etc.) so we ended up doing our own k8s yaml files for Deployments, Services, etc., but starting from what the Dask helm chart produced.

It turns out that Prefect does have a mechanism for running Flows on a dynamically created Dask cluster using dask-kubernetes. FYI, I opened an issue in Prefect's repo about eventually supporting multiple worker types: PrefectHQ/prefect#1586 I referenced for PR for using SpecCluster and pointed them to this discussion.

@jacobtomlinson probably nothing else to do on this for now, but definitely interested in future support for multiple worker types in a Dask cluster dynamically created with dask-kubernetes. That can definitely wait as I know you're still getting the SpecCluster PR merged. Thanks!

@jacobtomlinson
Copy link
Member

Ok great! I'll close this out for now but let's keep track of this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants