Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Job queueing functionality with Ray Serve + Workflows #21161

Closed
5 tasks
ericl opened this issue Dec 17, 2021 · 19 comments
Closed
5 tasks

[RFC] Job queueing functionality with Ray Serve + Workflows #21161

ericl opened this issue Dec 17, 2021 · 19 comments
Labels
RFC RFC issues stale The issue is stale. It will be closed within 7 days unless there are further conversation

Comments

@ericl
Copy link
Contributor

ericl commented Dec 17, 2021

Overview

A number of users have raised the desire for a native Job queue within Ray, which can provide:

  • persistence (guarantee that tasks aren't lost due to failures)
  • rate limiting (limit the number of concurrently executing tasks in the cluster)
  • integration with online serving / event sources (e.g., Serve, SQS)
  • Ray integration (use Ray as the execution layer)

This aligns well with the combination of Serve + the Workflows project (https://docs.ray.io/en/latest/workflows/concepts.html), with a few enhancements. In particular, Workflows needs to:

  • support rate limiting. The simplest version would be something like workflow.init(max_concurrent_jobs=100).
  • support some API to query/list the queue of pending workflows (perhaps extending the metadata API: [Workflow]add doc for metadata #20156).

Reference Architecture

The following proposes a reference architecture for supporting Job queueing in Ray, with support for querying job status and integration with serve & event sources.

At a high level, it is composed of a few parts:

  1. A Serve deployment that can take incoming Job execution requests and Job status query requests.
  • Each job is launched as a separate Ray workflow, with an incrementing workflow id.
  • Workflows will run as many concurrent jobs as possible given its concurrency limit, ordered by workflow id.
  • Workflows can be queried for the status of any given workflow and general queue stats.
  1. A workflow that runs in the background polling events from an event bus, e.g., SQS.
  • When an event is received, it launches a workflow to process the event and acks the event.

flow

Discussion

Fault tolerance:

  • Each workflow is durably persisted to workflow storage, and automatically retried on failure while the cluster is running.
  • When the cluster restarts, the Serve deployment is re-launched, and this calls workflow.resume_all() on startup to resume interrupted jobs.

Scalability:

  • This architecture should scale to many thousands of pending jobs. Further scalability to millions of pending jobs requires adding an index to the workflows storage.
  • The size of each job can be arbitrarily scalable (e.g., use thousands of cores) since it's just running in Ray.
  • The latency per job is several hundreds of milliseconds, due to the storage choice of Workflows. This should be low enough for Ray-based compute jobs, but can be optimized in the future with faster Workflow storage plugins.

Status

Currently, this project is in discussion and we want to gather further feedback from the community. Should we proceed, most of the work involves adding operational features to workflows and examples:

P0:

  • Add rate-limiting support to workflows
  • Add index to workflows to support more queued / total jobs

P1:

  • Add metadata query functionality to workflows
  • End-to-end application example with Ray Serve + Workflows, and blog post
  • Nightly scalability tests

cc @simon-mo @iycheng @edoakes @anabranch @yiranwang52

@ericl ericl added the RFC RFC issues label Dec 17, 2021
@mbehrendt
Copy link

thx a lot for putting together this proposal @ericl .

One topic we should drill into a bit more is the persistence layer, and what to persist. E.g. storing execution metadata and potentially other data can become stressful for the underlying persistence technology. What's your thinking around this?

@rkooo567
Copy link
Contributor

This document also could be useful for feature comparison? https://docs.google.com/spreadsheets/d/1Q_v__QtYzDtIDOkfmeQqVAw_jiksBz-GbJn-m-pDM34/edit#gid=0

@ericl
Copy link
Contributor Author

ericl commented Dec 18, 2021

One topic we should drill into a bit more is the persistence layer, and what to persist. E.g. storing execution metadata and potentially other data can become stressful for the underlying persistence technology. What's your thinking around this?

Currently, workflows supports filesystem and S3-based persistence. This is relatively high latency (~100ms+ per operation), but very scalable. Once we add an index, this should also scale nicely for very large queues of jobs.

@ericl
Copy link
Contributor Author

ericl commented Dec 21, 2021

Also cc @pcmoritz

@amogkam
Copy link
Contributor

amogkam commented Dec 21, 2021

Would this support job prioritization or dynamic resources for each job?

For example, I have a long-running training job and want it to use all the resources in my cluster. But if a higher priority job comes in, I want to move some of my resources from the training job to the new high priority job. Assume that the training job can natively handle scaling down/up without having to restart training.

@ericl
Copy link
Contributor Author

ericl commented Dec 21, 2021

Would this support job prioritization or dynamic resources for each job?

Great question. In the RFC we only have support for rate limiting. Supporting prioritized jobs / multiple queues of different types of jobs would be the next obvious extension [P1]. Though we may need to add features to Ray to support prioritization of existing running jobs.

@mbehrendt
Copy link

Would this support job prioritization or dynamic resources for each job?

Great question. In the RFC we only have support for rate limiting. Supporting prioritized jobs / multiple queues of different types of jobs would be the next obvious extension [P1]. Though we may need to add features to Ray to support prioritization of existing running jobs.

would be interesting to have this guided by use cases. in a certain way, prioritization of work is/was driven by hard constraints in terms of capacity. when running on a cloud, there is still a quota set for an account, but in general there is much more freedom wrt allocating new capacity as needed, and also take alternative approaches, which would be interesting to think through. E.g. instead of having to make prioritization trade-offs within Ray, there is also the option of just spinning up another ray cluster, for workloads of a different priority. with that it could also be ensured (even more strongly) that low and high-priority work don't step on each other, but would avoid the need for implementing complex cluster-internal prioritization logic.

WDYT?

@ericl
Copy link
Contributor Author

ericl commented Dec 22, 2021 via email

@jacobdanovitch
Copy link

Rate limiting for workflows would be really helpful, currently struggling a bit to setup a workflow where we have to call an API at a certain rate per second.

@ericl
Copy link
Contributor Author

ericl commented Dec 27, 2021

Interesting nuggets on Ray for job processing from this blog: https://vishnudeva.medium.com/scaling-applications-on-kubernetes-with-ray-23692eb2e6f0

But as good as my experience was using Ray for ML, I didn’t really consider using it to offload job processing for a web application when the need arose. There was no need because there’s no dearth of dedicated software available to process jobs: Celery, RQ (Redis Queue), Kubeflow Pipelines (KFP), Airflow, Volcano, and more.

However, what I actually found out was that while solutions like Celery and RQ allowed for low-latency job processing, they needed a dedicated team to build and maintain a layer of infrastructure on top that would allow for more complex functionality — worker-aware scheduling, GPU support, etc.

But with the unpredictable overhead of Kubernetes scheduling, we pretty much need to forget about low latency processing for our jobs, which is not an option when your goal is to enable near real-time use-cases.

This echoes other use cases for Ray as a unified & high-performance processing backend.

@ericl ericl pinned this issue Dec 27, 2021
@jon-chuang
Copy link
Contributor

jon-chuang commented Dec 29, 2021

re: job prioritization

Agreed about spinning up separate clusters for online and offline use-cases for most scenarios.

However, also consider that some use-cases like #2618 (comment), mention sharing the dataplane for doing offline compute on intermediate outputs from the online pipeline while it is running, without needing to build an intermediate transfer/persistence layer.

Conversely, some users want to run low-latency observability tasks against a primarily offline compute workload: #2618 (comment)

Also note the other use-case, which is to have bursty workloads with low-latency requirements running on the same cluster as offline compute to save costs on the former were it to need standalone cluster (multi-tenancy). #16782 (comment)

Others:
#8263 mentions wanting to also preempt low-priority tasks based on scheduling priority (not triggered by resource condition like memory usage, but other blockers to immediate scheduling like placement groups etc). Implementation strategy of preemption may echo that of preemption to prevent OOM: #20495

To my mind, reliance on live usage statistics instead of placement groups could also help with better packing. Referencing how Lambda does its scheduling could perhaps help improve the autoscaler on this front.

Note that priority itself makes no difference if the low-latency online task is submitted when all resources are (logically or physically) fully-consumed by long-running offline tasks.

Not sure how valid the comments are about spinning up more nodes since user may want tighter-packing and faster response times for their low-latency workloads than autoscaler can provide (furthermore, though it can be tuned, it may be difficult to tune the upscaling speed well without having permanent upscaling slack and thus lower resource efficiency).

This is especially true if the online tasks can require massive scale (think a multi-node language or multi-modal model), which makes the packing problem harder without preemption.


In light of this, warm-engine worker pool + premptible task prioritization might be a solution that solves tighter resource packing for bursty and unpredictable online workloads and/or same-dataplane offline+online workloads. Non-preemptible task prioritization may also solve the issue but only if offline tasks are not long-running, i.e. are <100ms (but that is unlikely to my mind).

Some parameters worth considering:

  1. How fast can offline tasks be preempted and killed?
  2. What proportion of the workload is offline v.s. online? How bursty/how much variance is there in our online/offline workload? How does this affect our preference for relying on upscaling slack v.s. preemption and task prioritization?
  3. At what level should the prioritization be handled?
    • Presumably, we'd prefer if the application layer for the offline compute is able to handle scale-up and scale-down as @amogkam suggested, but if scale-down cannot be done instantly, we probably also need preemption to guarantee instant resource allocation for our high-priority job.

@jon-chuang
Copy link
Contributor

jon-chuang commented Dec 30, 2021

Recommend this very informative and relevant talk on preemption and packing of jobs for Nomad, a low-latency, lightweight cluster orchestration tool based on distributed scheduling via Sparrow, made by Hashicorp.

Key takeaways:

  • (named) task groups and priority levels
    • task groups also very related in my mind also to observability, task tagging, and searchable/filter-able task graph visualization
  • Both intra-node and inter-node preemption via greedy and approximate heuristics
    • In the context of Ray, preemption decisions also need to be made on the basis of the availability of object store inputs on various candidate nodes
  • Observability for live and historical preemption and scheduling decisions

Additional thoughts:

  • Thinking about the design of low-latency systems like ScyllaDB which still run heavy background tasks like analytic workloads and database compaction in the same cluster while meeting latency SLA (checkout this talk for details), use of (fine-grained) priority levels and priority groups for online and offline tasks is a very common practice when latency is a concern.
    • I think there is something to consider here given that one of the strong selling points of Ray is its ability to provide low-latency task execution.
    • Also consider weighted fair-queuing which is used for bandwidth sharing in networks as a model for fair, priority-aware job queues
  • In exploring Ray as a more general purpose system that allows for vertically integrating other types of compute alongside ML workloads, I believe multi-tenancy, sharing the same dataplane, managing a single-cluster with multiple job groups with different latency requirements, are just some of the important capabilities to investigate.

@ericl ericl unpinned this issue Jan 7, 2022
@jon-chuang
Copy link
Contributor

jon-chuang commented Jan 14, 2022

Another use case for priority scheduling: prioritizing bottlenecked tasks (e.g. in an execution DAG): apache/datafusion#1221 (comment)

@ddelange
Copy link
Contributor

ddelange commented Mar 21, 2022

persistence (guarantee that tasks aren't lost due to failures)

Currently, docs state:

Ray currently cannot survive head node failure and we recommend using application specific failure recovery solutions.

Will the requirements of persistence be tracked under this RFC? Is handling head node failure part of this RFC?

Take for instance the scenario where both ray-operator and the head pod are scheduled on a spot instance, that spot instance gets unexpectedly killed, and these pods are restarted on another node (I guess this scenario is also relevant outside the Serve/Workflows context).

  • What happens to jobs that had already finished but are still needed?
  • What happens to jobs that were currently being processed (worker panic)?
  • What happens to jobs that were still in the queue?

@young-chao
Copy link

young-chao commented Oct 11, 2022 via email

@stale
Copy link

stale bot commented Feb 11, 2023

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

  • If you'd like to keep the issue open, just leave any comment, and the stale label will be removed!
  • If you'd like to get more attention to the issue, please tag one of Ray's contributors.

You can always ask for help on our discussion forum or Ray's public slack channel.

@stale stale bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Feb 11, 2023
@richardliaw
Copy link
Contributor

I believe this proposal is being superseded by #32292.

Please comment there to voice your support + vote on which option is better for your use case! We are looking for more signals.

@stale stale bot removed the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Feb 16, 2023
@stale
Copy link

stale bot commented Jun 18, 2023

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

  • If you'd like to keep the issue open, just leave any comment, and the stale label will be removed!
  • If you'd like to get more attention to the issue, please tag one of Ray's contributors.

You can always ask for help on our discussion forum or Ray's public slack channel.

@stale stale bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jun 18, 2023
@stale
Copy link

stale bot commented Aug 12, 2023

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!

@stale stale bot closed this as completed Aug 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
RFC RFC issues stale The issue is stale. It will be closed within 7 days unless there are further conversation
Projects
None yet
Development

No branches or pull requests

9 participants