Skip to content

Commit

Permalink
[core] Minor cpp improvement (ray-project#48734)
Browse files Browse the repository at this point in the history
Some C++ improvements to our codebase.

---------

Signed-off-by: dentiny <dentinyhao@gmail.com>
Signed-off-by: hjiang <dentinyhao@gmail.com>
  • Loading branch information
dentiny committed Dec 7, 2024
1 parent 40123dc commit 9b0f8c8
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 26 deletions.
6 changes: 1 addition & 5 deletions cpp/src/ray/runtime/task/native_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,7 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
scheduling_strategy,
"");
}
std::vector<ObjectID> return_ids;
for (const auto &ref : return_refs) {
return_ids.push_back(ObjectID::FromBinary(ref.object_id()));
}
return return_ids[0];
return ObjectID::FromBinary(return_refs[0].object_id());
}

ObjectID NativeTaskSubmitter::SubmitTask(InvocationSpec &invocation,
Expand Down
9 changes: 5 additions & 4 deletions src/ray/common/scheduling/cluster_resource_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ResourceRequest {
ResourceRequest() : ResourceRequest({}, false) {}

/// Construct a ResourceRequest with a given resource map.
ResourceRequest(absl::flat_hash_map<ResourceID, FixedPoint> resource_map)
explicit ResourceRequest(absl::flat_hash_map<ResourceID, FixedPoint> resource_map)
: ResourceRequest(resource_map, false){};

ResourceRequest(absl::flat_hash_map<ResourceID, FixedPoint> resource_map,
Expand Down Expand Up @@ -131,15 +131,16 @@ class TaskResourceInstances {
boost::select_first_range<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>>;

/// Construct an empty TaskResourceInstances.
TaskResourceInstances() {}
TaskResourceInstances() = default;

/// Construct a TaskResourceInstances with the values from a ResourceSet.
TaskResourceInstances(const ResourceSet &resources) {
explicit TaskResourceInstances(const ResourceSet &resources) {
for (auto &resource_id : resources.ResourceIds()) {
std::vector<FixedPoint> instances;
auto value = resources.Get(resource_id);
if (resource_id.IsUnitInstanceResource()) {
size_t num_instances = static_cast<size_t>(value.Double());
instances.reserve(instances.size() + num_instances);
for (size_t i = 0; i < num_instances; i++) {
instances.push_back(1.0);
};
Expand Down Expand Up @@ -196,7 +197,7 @@ class TaskResourceInstances {
/// Set the per-instance values for a particular resource.
TaskResourceInstances &Set(const ResourceID resource_id,
const std::vector<FixedPoint> &instances) {
if (instances.size() == 0) {
if (instances.empty()) {
Remove(resource_id);
} else {
resources_[resource_id] = instances;
Expand Down
7 changes: 4 additions & 3 deletions src/ray/core_worker/transport/normal_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
bool keep_executing = true;
{
absl::MutexLock lock(&mu_);
if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end()) {
cancelled_tasks_.erase(task_spec.TaskId());
auto task_iter = cancelled_tasks_.find(task_spec.TaskId());
if (task_iter != cancelled_tasks_.end()) {
cancelled_tasks_.erase(task_iter);
keep_executing = false;
}
if (keep_executing) {
Expand Down Expand Up @@ -329,7 +330,7 @@ void NormalTaskSubmitter::RequestNewWorkerIfNeeded(const SchedulingKey &scheduli
// same TaskID to request a worker
auto resource_spec_msg = scheduling_key_entry.resource_spec.GetMutableMessage();
resource_spec_msg.set_task_id(TaskID::FromRandom(job_id_).Binary());
const TaskSpecification resource_spec = TaskSpecification(resource_spec_msg);
const TaskSpecification resource_spec = TaskSpecification(std::move(resource_spec_msg));
rpc::Address best_node_address;
const bool is_spillback = (raylet_address != nullptr);
bool is_selected_based_on_locality = false;
Expand Down
23 changes: 13 additions & 10 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void LocalTaskManager::QueueAndScheduleTask(std::shared_ptr<internal::Work> work
// guarantee that the local node is not selected for scheduling.
ASSERT_FALSE(
cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeDraining());
WaitForTaskArgsRequests(work);
WaitForTaskArgsRequests(std::move(work));
ScheduleAndDispatchTasks();
}

Expand All @@ -86,25 +86,25 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr<internal::Work> w
const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass();
auto object_ids = task.GetTaskSpecification().GetDependencies();
bool can_dispatch = true;
if (object_ids.size() > 0) {
if (!object_ids.empty()) {
bool args_ready = task_dependency_manager_.RequestTaskDependencies(
task_id,
task.GetDependencies(),
{task.GetTaskSpecification().GetName(), task.GetTaskSpecification().IsRetry()});
if (args_ready) {
RAY_LOG(DEBUG) << "Args already ready, task can be dispatched " << task_id;
tasks_to_dispatch_[scheduling_key].push_back(work);
tasks_to_dispatch_[scheduling_key].emplace_back(std::move(work));
} else {
RAY_LOG(DEBUG) << "Waiting for args for task: "
<< task.GetTaskSpecification().TaskId();
can_dispatch = false;
auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), work);
auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), std::move(work));
RAY_CHECK(waiting_tasks_index_.emplace(task_id, it).second);
}
} else {
RAY_LOG(DEBUG) << "No args, task can be dispatched "
<< task.GetTaskSpecification().TaskId();
tasks_to_dispatch_[scheduling_key].push_back(work);
tasks_to_dispatch_[scheduling_key].emplace_back(std::move(work));
}
return can_dispatch;
}
Expand All @@ -130,13 +130,16 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() {
auto &scheduling_class = shapes_it->first;
auto &dispatch_queue = shapes_it->second;

if (info_by_sched_cls_.find(scheduling_class) == info_by_sched_cls_.end()) {
auto sched_cls_iter = info_by_sched_cls_.find(scheduling_class);
if (sched_cls_iter == info_by_sched_cls_.end()) {
// Initialize the class info.
info_by_sched_cls_.emplace(
scheduling_class,
SchedulingClassInfo(MaxRunningTasksPerSchedulingClass(scheduling_class)));
sched_cls_iter = info_by_sched_cls_
.emplace(scheduling_class,
SchedulingClassInfo(MaxRunningTasksPerSchedulingClass(
scheduling_class)))
.first;
}
auto &sched_cls_info = info_by_sched_cls_.at(scheduling_class);
auto &sched_cls_info = sched_cls_iter->second;

// Fair scheduling is applied only when the total CPU requests exceed the node's
// capacity. This skips scheduling classes whose number of running tasks exceeds the
Expand Down
13 changes: 9 additions & 4 deletions src/ray/raylet/scheduling/cluster_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,21 @@ void ClusterTaskManager::QueueAndScheduleTask(
RAY_LOG(DEBUG) << "Queuing and scheduling task "
<< task.GetTaskSpecification().TaskId();
auto work = std::make_shared<internal::Work>(
task, grant_or_reject, is_selected_based_on_locality, reply, [send_reply_callback] {
task,
grant_or_reject,
is_selected_based_on_locality,
reply,
[send_reply_callback = std::move(send_reply_callback)] {
send_reply_callback(Status::OK(), nullptr, nullptr);
});
const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass();
// If the scheduling class is infeasible, just add the work to the infeasible queue
// directly.
if (infeasible_tasks_.count(scheduling_class) > 0) {
infeasible_tasks_[scheduling_class].push_back(work);
auto infeasible_tasks_iter = infeasible_tasks_.find(scheduling_class);
if (infeasible_tasks_iter != infeasible_tasks_.end()) {
infeasible_tasks_iter->second.emplace_back(std::move(work));
} else {
tasks_to_schedule_[scheduling_class].push_back(work);
tasks_to_schedule_[scheduling_class].emplace_back(std::move(work));
}
ScheduleAndDispatchTasks();
}
Expand Down

0 comments on commit 9b0f8c8

Please sign in to comment.