Skip to content

Commit

Permalink
add worker_id to TASK X-RAY trace (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
yifeif authored Dec 4, 2020
1 parent 575c17c commit d1c498c
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
14 changes: 11 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ cdef execute_task(

task_name = name.decode("utf-8")
title = f"ray::{task_name}"
worker_id = core_worker.get_worker_id()

if <int>task_type == <int>TASK_TYPE_NORMAL_TASK:
next_title = "ray::IDLE"
Expand All @@ -378,7 +379,8 @@ cdef execute_task(
f"X-RAY-TRACE message:'NORMAL_TASK_EXECUTED' title:{next_title}"
f" task_name:{task_name}"
f" job_id:{job_id.hex().encode('ascii')}"
f" task_id:{task_id.hex().encode('ascii')}")
f" task_id:{task_id.hex().encode('ascii')}"
f" worker_id:{worker_id.hex().encode('ascii')}")
else:
actor = worker.actors[core_worker.get_actor_id()]
class_name = actor.__class__.__name__
Expand All @@ -390,15 +392,17 @@ cdef execute_task(
f" task_name:{task_name}"
f" job_id:{job_id.hex().encode('ascii')}"
f" task_id:{task_id.hex().encode('ascii')}"
f" actor_id:{core_worker.get_actor_id().hex().encode('ascii')}") # noqa
f" actor_id:{core_worker.get_actor_id().hex().encode('ascii')}"
f" worker_id:{worker_id.hex().encode('ascii')}") # noqa
else:
logger.info(
f"X-RAY-TRACE message:'ACTOR_TASK_EXECUTED'"
f" title:{next_title}"
f" task_name:{task_name}"
f" job_id:{job_id.hex().encode('ascii')}"
f" task_id:{task_id.hex().encode('ascii')}"
f" actor_id:{core_worker.get_actor_id().hex().encode('ascii')}") # noqa
f" actor_id:{core_worker.get_actor_id().hex().encode('ascii')}"
f" worker_id:{worker_id.hex().encode('ascii')}") # noqa
pid = os.getpid()
worker_name = f"ray_{class_name}_{pid}"
if c_resources.find(b"object_store_memory") != c_resources.end():
Expand Down Expand Up @@ -843,6 +847,10 @@ cdef class CoreWorker:
return ActorID(
CCoreWorkerProcess.GetCoreWorker().GetActorId().Binary())

def get_worker_id(self):
return WorkerID(
CCoreWorkerProcess.GetCoreWorker().GetWorkerID().Binary())

def get_placement_group_id(self):
return PlacementGroupID(
CCoreWorkerProcess.GetCoreWorker()
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CPlacementGroupID GetCurrentPlacementGroupId()
c_bool ShouldCaptureChildTasksInPlacementGroup()
const CActorID &GetActorId()
const CWorkerID &GetWorkerID() const
void SetActorTitle(const c_string &title)
void SetWebuiDisplay(const c_string &key, const c_string &message)
CTaskID GetCallerId()
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1785,7 +1785,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
task_type, task_spec.GetName(), func,
task_spec.GetRequiredResources().GetResourceMap(), args, arg_reference_ids,
return_ids, return_objects);
RAY_LOG(INFO) << "X-RAY-TRACE message:'TASK_DONE.' task_id:" << task_spec.TaskId();
RAY_LOG(INFO) << "X-RAY-TRACE message:'TASK_DONE.' task_id:" << task_spec.TaskId() << " worker_id:" << GetWorkerID();
absl::optional<rpc::Address> caller_address(
options_.is_local_mode ? absl::optional<rpc::Address>()
: worker_context_.GetCurrentTask()->CallerAddress());
Expand Down

0 comments on commit d1c498c

Please sign in to comment.