Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Hybrid scheduling policy. #14790

Merged
merged 22 commits into from
Apr 1, 2021

Conversation

wuisawesome
Copy link
Contributor

@wuisawesome wuisawesome commented Mar 18, 2021

Why are these changes needed?

This PR introduces a new scheduling policy which is a hybrid of a pack and round robin policy. Description from the doc string:

/// This scheduling policy was designed with the following assumptions in mind:
///   1. Scheduling a task on a new node incurs a cold start penalty (warming the worker
///   pool).
///   2. Past a certain utilization threshold, a big noisy neighbor problem occurs (caused
///   by object spilling).
///   3. Locality is helpful, but generally outweighed by (1) and (2).
///
/// In order to solve these problems, we use the following scheduling policy.
///   1. Generate a traversal.
///   2. Run a priority scheduler.
///
/// A node's priorities are determined by the following factors:
///   * Always skip infeasible nodes
///   * Always prefer available nodes over feasible nodes.
///   * Break ties in available/feasible by critical resource utilization.
///   * Critical resource utilization below a threshold should be trucnated to 0.
///
/// The traversal order should:
///   * Prioritize the local node above all others.
///   * All other nodes should have a globally fixed priority across the cluster.
///
/// We call this a hybrid policy because below the threshold, the traversal and truncation
/// properties will lead to packing of nodes. Above the threshold, the policy will act
/// like a traditional weighted round robin.

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 18, 2021
@rkooo567 rkooo567 self-assigned this Mar 18, 2021
@ericl
Copy link
Contributor

ericl commented Mar 19, 2021 via email

@wuisawesome
Copy link
Contributor Author

Bin packing for GPU is always preferable since they are so expensive.

The GPU is in the cluster either way though right? Is this a heuristic to help with scaling down?

@ericl
Copy link
Contributor

ericl commented Mar 19, 2021 via email

@ericl ericl added @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. and removed @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. labels Mar 19, 2021
@rkooo567
Copy link
Contributor

Is it still WIP btw?

@wuisawesome wuisawesome changed the title [WIP] Hybrid scheduling policy. [core] Hybrid scheduling policy. Mar 23, 2021
@wuisawesome wuisawesome changed the title [core] Hybrid scheduling policy. [WIP][core] Hybrid scheduling policy. Mar 23, 2021
@wuisawesome wuisawesome removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 23, 2021
@rkooo567 rkooo567 requested a review from ericl March 23, 2021 05:08
@rkooo567 rkooo567 changed the title [WIP][core] Hybrid scheduling policy. [core] Hybrid scheduling policy. Mar 23, 2021
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add one python test? I think we can make it something like this;

  1. 3 nodes cluster (0 cpu head node)
  2. Each node has 100MB object store memory & 2 cpus
  3. Make a task that returns 55MB object.
  4. Create 3 tasks
  5. Make sure all nodes has 55MB object memory usage.

/// Whether to use the hybrid scheduling policy, or one of the legacy spillback
/// strategies. In the hybrid scheduling strategy, leases are packed until a threshold,
/// then spread via weighted (by critical resource usage).
RAY_CONFIG(bool, scheduler_hybrid_scheduling,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's happening if both this and scheduler_loadbalance_spillback are true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like scheduler_loadbalance_spillback is ignored. Can you add a TODO comment to clean this up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag takes precedence over the other one.

///
/// \return -1 if the task is infeasible, otherwise the node id (key in `nodes`) to
/// schedule on.
int64_t HybridPolicy(const TaskRequest &task_request, const int64_t local_node_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a TODO to move other policies to this file? (load balancing & simple bin packing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I was actually thinking we would delete those, but sure

float highest = 0;
for (const auto &i : {CPU, MEM, OBJECT_STORE_MEM}) {
if (i >= this->predefined_resources.size()) {
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it every happened? Why don't we just add a check here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this actually happens. check was inspired by real events :p (most of the unit tests actually trigger it)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting... Why is that? I thought the first N entries are reserved for predefined resources..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but the original data structure is actually a vector, and it's not dynamically resized at initialization time. This means that for some time, a task req of {"CPU": 1} will have predefined_resources.size() == 1 until some code comes along and resizes the vector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a bit of a funky data model... Given that predefined resources are static by definition, wouldn't it be a lot cleaner if predefined_resources was a static array whose elements may be unset? That would still allow for enum-based indexing (which I like), but would get rid of all of the dynamic resizing and size checks, and should make it far less brittle to hard-to-catch writer bugs around improper resizing. If that might make sense, this would obviously be a refactor that can wait for a future PR.

highest = utilization;
}
}
return highest;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just use std::min(highest, utilization)?

@@ -164,6 +164,88 @@ NodeResources ResourceMapToNodeResources(
return node_resources;
}

float NodeResources::CalculateCriticalResourceUtilization() const {
Copy link
Contributor

@rkooo567 rkooo567 Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write unit tests for these 3 new functions?

bool NodeResources::IsAvailable(const TaskRequest &task_req) const {
// First, check predefined resources.
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (i >= this->predefined_resources.size()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Is this condition ever invoked? Why don't we just add a check?

}

bool NodeResources::IsFeasible(const TaskRequest &task_req) const {
// First, check predefined resources.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks almost identical to IsAvailable except that it uses total instead of available. Any good way to reduce code duplication?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could add a shared private IsWithinCapacity helper that also takes a lambda like [](const ResourceCapacity &capacity) { return capacity.available; }), but that would probably sacrifice a bit of readability. Definitely shouldn't block the PR IMO.

bool NodeResources::IsAvailable(const TaskRequest &task_req) const {
  return IsWithinCapacity(task_req, [](const ResourceCapacity &capacity) { return capacity.available; });
}

bool NodeResources::IsFeasible(const TaskRequest &task_req) const {
  return IsWithinCapacity(task_req, [](const ResourceCapacity &capacity) { return capacity.total; });
}

bool NodeResources::IsWithinCapacity(const TaskRequest &task_req, std::function<FixedPoint(const ResourceCapacity &)> get_capacity) {
  // First, check predefined resources.
  for (size_t i = 0; i < PredefinedResources_MAX; i++) {
    if (i >= this->predefined_resources.size()) {
      if (task_req.predefined_resources[i].demand != 0) {
        return false;
      }
      continue;
    }

    const auto &resource = get_capacity(this->predefined_resources[i]);
    const auto &demand = task_req.predefined_resources[i].demand;
    bool is_soft = task_req.predefined_resources[i].soft;

    if (resource < demand && !is_soft) {
      return false;
    }
  }

  // Now check custom resources.
  for (const auto &task_req_custom_resource : task_req.custom_resources) {
    bool is_soft = task_req_custom_resource.soft;
    auto it = this->custom_resources.find(task_req_custom_resource.id);
    if (it == this->custom_resources.end() && !is_soft) {
      return false;
    } else if (task_req_custom_resource.demand > get_capacity(it->second) && !is_soft) {
      return false;
    }
  }
  return true;
}

float CalculateCriticalResourceUtilization() const;
/// Returns true if the node has the available resources to run the task.
/// Note: This doesn't account for the binpacking of unit resources.
bool IsAvailable(const TaskRequest &task_req) const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need IsFeasible in the cluster_resource_scheduler after this? (except for the legacy logic)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's only used by the legacy scheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just adding TODO comments on those functions so that we can easily clean up later?

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw I will still wait for Eric's approval, but it LGTM if you add a python test.

@ericl
Copy link
Contributor

ericl commented Mar 23, 2021

Failing C++ tests

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 23, 2021
@clarkzinzow
Copy link
Contributor

clarkzinzow commented Mar 25, 2021

@ericl That was my bad, my suggestion should have been

best_node = std::next(nodes_.begin(), idx)->first;

I constantly do that with iterators. 🤦 Maybe some day I'll get it through my thick head that iterators are just fancy constrained pointers.

Alex Wu and others added 2 commits March 25, 2021 16:37
@wuisawesome
Copy link
Contributor Author

@rkooo567 i'm going to remove the python test, it's way too flaky and exact scheduling behavior isn't guaranteed to users. we should just make sure the logic is correct (unit test) and the large scale behavior is correct (release test).

@rkooo567
Copy link
Contributor

@wuisawesome Why is it flaky if the logic is correct? Can you tell me the scenario when this could be flaky unavoidably?

@wuisawesome
Copy link
Contributor Author

I think the source of flakiness here is caused by the worker pool startup time, but it could involve the resource report updates too.

@wuisawesome
Copy link
Contributor Author

Yeah i found a way to reproduce the flakiness locally, and i believe what's happening is that a bunch of tasks are being scheduled on the driver/first node, but because the warm pool hasn't started yet, they don't allocate resources, therefore more tasks are scheduled locally.

@rkooo567
Copy link
Contributor

Hmm I see. Doesn't that mean this will also not work in real world due to the same issue? Btw, what does locally mean here? If you set 0 cpu head node, is it still reproducible?

@wuisawesome
Copy link
Contributor Author

I didn't try with a 0 CPU head node, but there's nothing special about local scheduling anymore, so the same logic applies. The problem actually exists with the PACK scheduler too, but because of the packing semantics, you can't detect it from the outcome (just the number of spillbacks)

@rkooo567
Copy link
Contributor

So, what's the conclusion here? Are we going to do the solution that you suggested in the slack?

@rkooo567
Copy link
Contributor

Seems like Windows tests are timing out. test_lease_request_leak

@rkooo567
Copy link
Contributor

rkooo567 commented Apr 1, 2021

Can you merge the latest master + lint? I think we can merge after that.

@wuisawesome
Copy link
Contributor Author

Here are some pretty pictures of scheduling throughput. They look as we would predict they would.

hybrid_cdf
hybrid

@wuisawesome
Copy link
Contributor Author

The remaining test failures seem unrelated. Lint fails due to SSL error (and passes in buildkite). The serve failure looks unrelated (serializing a dependency that changed?). I'm going to merge this now.

@wuisawesome wuisawesome merged commit 4fba05a into ray-project:master Apr 1, 2021
amogkam pushed a commit that referenced this pull request Apr 16, 2021
scv119 added a commit that referenced this pull request Feb 1, 2023
…of nodes in the cluster (#31934)

Why are these changes needed?
This PR takes over #26373

Currently, the initial scheduling delay for a simple f.remote() loop is approximately worker startup time (~1s) * number of nodes. There are three reasons for this:

1 . Drivers do not share physical worker processes, so each raylet must start new worker processes when a new driver starts. Each raylet starts the workers when the driver first sends a lease (resource) request to that raylet.
2. The #14790 prefers to pack tasks on fewer nodes up to 50% CPU utilization before spreading tasks for load-balancing.
3. The maximum number of concurrent lease requests is 10, meaning that the driver must wait for workers to start on the first 10 nodes that it contacts before sending lease requests to the next set of nodes. Because of (2), the first 10 nodes contacted is usually not unique, especially when each node has many cores.

This PR change (3), which allows us to dynamic adjust the max_pending_lease_requests based on the number of nodes in the cluster.
Without this PR, the top k scheduling algorithm is bottlenecked by the speed of sending lease request across the cluster.
edoakes pushed a commit to edoakes/ray that referenced this pull request Mar 22, 2023
…of nodes in the cluster (ray-project#31934)

Why are these changes needed?
This PR takes over ray-project#26373

Currently, the initial scheduling delay for a simple f.remote() loop is approximately worker startup time (~1s) * number of nodes. There are three reasons for this:

1 . Drivers do not share physical worker processes, so each raylet must start new worker processes when a new driver starts. Each raylet starts the workers when the driver first sends a lease (resource) request to that raylet.
2. The ray-project#14790 prefers to pack tasks on fewer nodes up to 50% CPU utilization before spreading tasks for load-balancing.
3. The maximum number of concurrent lease requests is 10, meaning that the driver must wait for workers to start on the first 10 nodes that it contacts before sending lease requests to the next set of nodes. Because of (2), the first 10 nodes contacted is usually not unique, especially when each node has many cores.

This PR change (3), which allows us to dynamic adjust the max_pending_lease_requests based on the number of nodes in the cluster.
Without this PR, the top k scheduling algorithm is bottlenecked by the speed of sending lease request across the cluster.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants