Skip to content

Commit

Permalink
[Core][Core-worker] add job_id in log file as source of truth (#31772)
Browse files Browse the repository at this point in the history
Why are these changes needed?
In #30883 we lazy bind job_config to a worker. In this way we can no longer know the job id when worker starts up. Instead, we log the job_id as part of the log similar to task_name, and parse it when we process the log.

To remain backward compatibility, we don't remove job_id from the worker log for now; but we probably will remove it in the future version.
  • Loading branch information
scv119 authored Jan 19, 2023
1 parent 8f8b95e commit 3f7a0f8
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 9 deletions.
15 changes: 9 additions & 6 deletions python/ray/_private/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
logger = logging.getLogger(__name__)

# The groups are job id, and pid.
JOB_LOG_PATTERN = re.compile(".*worker.*-([0-9a-f]+)-(\d+)")
WORKER_LOG_PATTERN = re.compile(".*worker.*-([0-9a-f]+)-(\d+)")
# The groups are job id.
RUNTIME_ENV_SETUP_PATTERN = re.compile(".*runtime_env_setup-(\d+).log")
# Log name update interval under pressure.
Expand Down Expand Up @@ -218,13 +218,12 @@ def update_log_filenames(self):
+ runtime_env_setup_paths
):
if os.path.isfile(file_path) and file_path not in self.log_filenames:
job_match = JOB_LOG_PATTERN.match(file_path)
if job_match:
job_id = job_match.group(1)
worker_pid = int(job_match.group(2))
worker_match = WORKER_LOG_PATTERN.match(file_path)
if worker_match:
worker_pid = int(worker_match.group(2))
else:
job_id = None
worker_pid = None
job_id = None

# Perform existence check first because most file will not be
# including runtime_env. This saves some cpu cycle.
Expand Down Expand Up @@ -365,6 +364,10 @@ def flush():
file_info.task_name = next_line.split(
ray_constants.LOG_PREFIX_TASK_NAME, 1
)[1]
elif next_line.startswith(ray_constants.LOG_PREFIX_JOB_ID):
file_info.job_id = next_line.split(
ray_constants.LOG_PREFIX_JOB_ID, 1
)[1]
elif next_line.startswith(
"Windows fatal exception: access violation"
):
Expand Down
2 changes: 2 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ def env_bool(key, default):
LOG_PREFIX_ACTOR_NAME = ":actor_name:"
# Task names are recorded in the logs with this magic token as a prefix.
LOG_PREFIX_TASK_NAME = ":task_name:"
# Job ids are recorded in the logs with this magic token as a prefix.
LOG_PREFIX_JOB_ID = ":job_id:"

# The object metadata field uses the following format: It is a comma
# separated list of fields. The first field is mandatory and is the
Expand Down
8 changes: 8 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,14 @@ def maybe_initialize_job_config():
sys.path.insert(0, p)
job_config_initialized = True

# Record the task name via :task_name: magic token in the log file.
# This is used for the prefix in driver logs `(task_name pid=123) ...`
job_id_magic_token = "{}{}\n".format(
ray_constants.LOG_PREFIX_JOB_ID, core_worker.get_current_job_id().hex())
# Print on both .out and .err
print(job_id_magic_token, end="")
print(job_id_magic_token, file=sys.stderr, end="")


# This function introduces ~2-7us of overhead per call (i.e., it can be called
# up to hundreds of thousands of times per second).
Expand Down
26 changes: 23 additions & 3 deletions python/ray/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,8 @@ def get_file_info(file_infos, filename):
assert not worker_out_log_file_info.is_err_file
assert worker_err_log_file_info.is_err_file

assert worker_out_log_file_info.job_id == job_id
assert worker_err_log_file_info.job_id == job_id
assert worker_out_log_file_info.job_id is None
assert worker_err_log_file_info.job_id is None
assert worker_out_log_file_info.worker_pid == int(dead_pid)
assert worker_out_log_file_info.worker_pid == int(dead_pid)

Expand Down Expand Up @@ -691,7 +691,7 @@ def get_file_info(file_infos, filename):
assert len(list((log_dir / "old").iterdir())) == 2


def test_log_monitor_actor_task_name(tmp_path):
def test_log_monitor_actor_task_name_and_job_id(tmp_path):
log_dir = tmp_path / "logs"
log_dir.mkdir()
worker_id = "6df6d5dd8ca5215658e4a8f9a569a9d98e27094f9cc35a4ca43d272c"
Expand Down Expand Up @@ -752,6 +752,26 @@ def test_log_monitor_actor_task_name(tmp_path):
}
)

# Test the job_id is updated.
job_id = "01000000"
with open(file_info.filename, "a") as f:
# Write 150 more lines.
f.write(f"{ray_constants.LOG_PREFIX_JOB_ID}{job_id}\n")
f.write("line2")
log_monitor.check_log_files_and_publish_updates()
assert file_info.job_id == job_id
mock_publisher.publish_logs.assert_any_call(
{
"ip": log_monitor.ip,
"pid": file_info.worker_pid,
"job": file_info.job_id,
"is_err": file_info.is_err_file,
"lines": ["line2"],
"actor_name": actor_name,
"task_name": None,
}
)


@pytest.fixture
def mock_timer():
Expand Down

0 comments on commit 3f7a0f8

Please sign in to comment.