Skip to content

Commit

Permalink
[Core] Rename CoreWorkerDirectTaskSubmitter to NormalTaskSubmitter (#…
Browse files Browse the repository at this point in the history
…46923)

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Aug 2, 2024
1 parent 44f74ba commit 9600bfe
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 416 deletions.
4 changes: 2 additions & 2 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -856,9 +856,9 @@ ray_cc_test(
)

ray_cc_test(
name = "direct_task_transport_test",
name = "normal_task_submitter_test",
size = "small",
srcs = ["src/ray/core_worker/test/direct_task_transport_test.cc"],
srcs = ["src/ray/core_worker/test/normal_task_submitter_test.cc"],
tags = ["team:core"],
deps = [
":core_worker_lib",
Expand Down
22 changes: 11 additions & 11 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
}
RAY_CHECK_OK(actor_task_submitter_->SubmitTask(spec));
} else {
RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec));
RAY_CHECK_OK(normal_task_submitter_->SubmitTask(spec));
}
}
},
Expand Down Expand Up @@ -541,7 +541,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
: std::shared_ptr<LeasePolicyInterface>(
std::make_shared<LocalLeasePolicy>(rpc_address_));

direct_task_submitter_ = std::make_unique<CoreWorkerDirectTaskSubmitter>(
normal_task_submitter_ = std::make_unique<NormalTaskSubmitter>(
rpc_address_,
local_raylet_client_,
core_worker_client_pool_,
Expand Down Expand Up @@ -1077,7 +1077,7 @@ void CoreWorker::InternalHeartbeat() {
}
RAY_CHECK_OK(actor_task_submitter_->SubmitTask(spec));
} else {
RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec));
RAY_CHECK_OK(normal_task_submitter_->SubmitTask(spec));
}
}

Expand All @@ -1088,9 +1088,9 @@ void CoreWorker::InternalHeartbeat() {

// Periodically report the lastest backlog so that
// local raylet will have the eventually consistent view of worker backlogs
// even in cases where backlog reports from direct_task_transport
// even in cases where backlog reports from normal_task_submitter
// are lost or reordered.
direct_task_submitter_->ReportWorkerBacklog();
normal_task_submitter_->ReportWorkerBacklog();

// Check for unhandled exceptions to raise after a timeout on the driver.
// Only do this for TTY, since shells like IPython sometimes save references
Expand Down Expand Up @@ -2231,7 +2231,7 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(

io_service_.post(
[this, task_spec]() {
RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec));
RAY_UNUSED(normal_task_submitter_->SubmitTask(task_spec));
},
"CoreWorker.SubmitTask");
}
Expand Down Expand Up @@ -2386,7 +2386,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
<< "Failed to register actor. Error message: "
<< status.ToString();
} else {
RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec));
RAY_UNUSED(normal_task_submitter_->SubmitTask(task_spec));
}
}));
},
Expand All @@ -2401,7 +2401,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
}
io_service_.post(
[this, task_spec = std::move(task_spec)]() {
RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec));
RAY_UNUSED(normal_task_submitter_->SubmitTask(task_spec));
},
"CoreWorker.SubmitTask");
}
Expand Down Expand Up @@ -2601,7 +2601,7 @@ Status CoreWorker::CancelTask(const ObjectID &object_id,
RAY_LOG(DEBUG).WithField(object_id)
<< "Request to cancel a task of object to an owner "
<< obj_addr.SerializeAsString();
return direct_task_submitter_->CancelRemoteTask(
return normal_task_submitter_->CancelRemoteTask(
object_id, obj_addr, force_kill, recursive);
}

Expand All @@ -2625,7 +2625,7 @@ Status CoreWorker::CancelTask(const ObjectID &object_id,

return actor_task_submitter_->CancelTask(task_spec.value(), recursive);
} else {
return direct_task_submitter_->CancelTask(task_spec.value(), force_kill, recursive);
return normal_task_submitter_->CancelTask(task_spec.value(), force_kill, recursive);
}
}

Expand All @@ -2645,7 +2645,7 @@ Status CoreWorker::CancelChildren(const TaskID &task_id, bool force_kill) {
recursive_cancellation_status.push_back(std::make_pair(child_id, result));
} else {
auto result =
direct_task_submitter_->CancelTask(child_spec.value(), force_kill, true);
normal_task_submitter_->CancelTask(child_spec.value(), force_kill, true);
recursive_cancellation_status.push_back(std::make_pair(child_id, result));
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
#include "ray/core_worker/store_provider/plasma_store_provider.h"
#include "ray/core_worker/task_event_buffer.h"
#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/core_worker/transport/direct_task_transport.h"
#include "ray/core_worker/transport/normal_task_submitter.h"
#include "ray/gcs/gcs_client/gcs_client.h"
#include "ray/pubsub/publisher.h"
#include "ray/pubsub/subscriber.h"
Expand Down Expand Up @@ -1112,11 +1112,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
const std::string &event_name);

int64_t GetNumTasksSubmitted() const {
return direct_task_submitter_->GetNumTasksSubmitted();
return normal_task_submitter_->GetNumTasksSubmitted();
}

int64_t GetNumLeasesRequested() const {
return direct_task_submitter_->GetNumLeasesRequested();
return normal_task_submitter_->GetNumLeasesRequested();
}

public:
Expand Down Expand Up @@ -1851,7 +1851,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
std::shared_ptr<LeaseRequestRateLimiter> lease_request_rate_limiter_;

// Interface to submit non-actor tasks directly to leased workers.
std::unique_ptr<CoreWorkerDirectTaskSubmitter> direct_task_submitter_;
std::unique_ptr<NormalTaskSubmitter> normal_task_submitter_;

/// Manages recovery of objects stored in remote plasma nodes.
std::unique_ptr<ObjectRecoveryManager> object_recovery_manager_;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/test/direct_actor_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "ray/common/task/task_spec.h"
#include "ray/common/test_util.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker/transport/direct_task_transport.h"
#include "ray/core_worker/transport/normal_task_submitter.h"
#include "ray/raylet_client/raylet_client.h"
#include "ray/rpc/worker/core_worker_client.h"
#include "mock/ray/core_worker/actor_creator.h"
Expand Down
32 changes: 16 additions & 16 deletions src/ray/core_worker/test/direct_task_transport_mock_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

// clang-format off
#include "ray/core_worker/transport/direct_task_transport.h"
#include "ray/core_worker/transport/normal_task_submitter.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "mock/ray/core_worker/actor_creator.h"
Expand All @@ -36,20 +36,20 @@ class DirectTaskTransportTest : public ::testing::Test {
[&](const rpc::Address &) { return nullptr; });
static std::shared_ptr<LeaseRequestRateLimiter> kRateLimiter =
std::make_shared<StaticLeaseRequestRateLimiter>(1);
task_submitter = std::make_unique<CoreWorkerDirectTaskSubmitter>(
rpc::Address(), /* rpc_address */
raylet_client, /* lease_client */
client_pool, /* core_worker_client_pool */
nullptr, /* lease_client_factory */
lease_policy, /* lease_policy */
std::make_shared<CoreWorkerMemoryStore>(),
task_finisher,
NodeID::Nil(), /* local_raylet_id */
WorkerType::WORKER, /* worker_type */
0, /* lease_timeout_ms */
actor_creator,
JobID::Nil() /* job_id */,
kRateLimiter);
task_submitter =
std::make_unique<NormalTaskSubmitter>(rpc::Address(), /* rpc_address */
raylet_client, /* lease_client */
client_pool, /* core_worker_client_pool */
nullptr, /* lease_client_factory */
lease_policy, /* lease_policy */
std::make_shared<CoreWorkerMemoryStore>(),
task_finisher,
NodeID::Nil(), /* local_raylet_id */
WorkerType::WORKER, /* worker_type */
0, /* lease_timeout_ms */
actor_creator,
JobID::Nil() /* job_id */,
kRateLimiter);
}

TaskSpecification GetCreatingTaskSpec(const ActorID &actor_id) {
Expand All @@ -62,7 +62,7 @@ class DirectTaskTransportTest : public ::testing::Test {
return TaskSpecification(task_spec);
}

std::unique_ptr<CoreWorkerDirectTaskSubmitter> task_submitter;
std::unique_ptr<NormalTaskSubmitter> task_submitter;
std::shared_ptr<MockRayletClientInterface> raylet_client;
std::shared_ptr<MockTaskFinisherInterface> task_finisher;
std::shared_ptr<MockActorCreatorInterface> actor_creator;
Expand Down
Loading

0 comments on commit 9600bfe

Please sign in to comment.