Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Core-worker] add job_id in log file as source of truth #31772

Merged
merged 5 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions python/ray/_private/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
logger = logging.getLogger(__name__)

# The groups are job id, and pid.
JOB_LOG_PATTERN = re.compile(".*worker.*-([0-9a-f]+)-(\d+)")
WORKER_LOG_PATTERN = re.compile(".*worker.*-([0-9a-f]+)-(\d+)")
# The groups are job id.
RUNTIME_ENV_SETUP_PATTERN = re.compile(".*runtime_env_setup-(\d+).log")
# Log name update interval under pressure.
Expand Down Expand Up @@ -218,13 +218,12 @@ def update_log_filenames(self):
+ runtime_env_setup_paths
):
if os.path.isfile(file_path) and file_path not in self.log_filenames:
job_match = JOB_LOG_PATTERN.match(file_path)
if job_match:
job_id = job_match.group(1)
worker_pid = int(job_match.group(2))
worker_match = WORKER_LOG_PATTERN.match(file_path)
if worker_match:
worker_pid = int(worker_match.group(2))
else:
job_id = None
worker_pid = None
job_id = None

# Perform existence check first because most file will not be
# including runtime_env. This saves some cpu cycle.
Expand Down Expand Up @@ -365,6 +364,10 @@ def flush():
file_info.task_name = next_line.split(
ray_constants.LOG_PREFIX_TASK_NAME, 1
)[1]
elif next_line.startswith(ray_constants.LOG_PREFIX_JOB_ID):
file_info.job_id = next_line.split(
ray_constants.LOG_PREFIX_JOB_ID, 1
)[1]
elif next_line.startswith(
"Windows fatal exception: access violation"
):
Expand Down
2 changes: 2 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ def env_bool(key, default):
LOG_PREFIX_ACTOR_NAME = ":actor_name:"
# Task names are recorded in the logs with this magic token as a prefix.
LOG_PREFIX_TASK_NAME = ":task_name:"
# Job ids are recorded in the logs with this magic token as a prefix.
LOG_PREFIX_JOB_ID = ":job_id:"

# The object metadata field uses the following format: It is a comma
# separated list of fields. The first field is mandatory and is the
Expand Down
8 changes: 8 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,14 @@ def maybe_initialize_job_config():
sys.path.insert(0, p)
job_config_initialized = True

# Record the task name via :task_name: magic token in the log file.
# This is used for the prefix in driver logs `(task_name pid=123) ...`
job_id_magic_token = "{}{}\n".format(
ray_constants.LOG_PREFIX_JOB_ID, core_worker.get_current_job_id().hex())
# Print on both .out and .err
print(job_id_magic_token, end="")
print(job_id_magic_token, file=sys.stderr, end="")


# This function introduces ~2-7us of overhead per call (i.e., it can be called
# up to hundreds of thousands of times per second).
Expand Down
26 changes: 23 additions & 3 deletions python/ray/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,8 @@ def get_file_info(file_infos, filename):
assert not worker_out_log_file_info.is_err_file
assert worker_err_log_file_info.is_err_file

assert worker_out_log_file_info.job_id == job_id
assert worker_err_log_file_info.job_id == job_id
assert worker_out_log_file_info.job_id is None
assert worker_err_log_file_info.job_id is None
assert worker_out_log_file_info.worker_pid == int(dead_pid)
assert worker_out_log_file_info.worker_pid == int(dead_pid)

Expand Down Expand Up @@ -691,7 +691,7 @@ def get_file_info(file_infos, filename):
assert len(list((log_dir / "old").iterdir())) == 2


def test_log_monitor_actor_task_name(tmp_path):
def test_log_monitor_actor_task_name_and_job_id(tmp_path):
log_dir = tmp_path / "logs"
log_dir.mkdir()
worker_id = "6df6d5dd8ca5215658e4a8f9a569a9d98e27094f9cc35a4ca43d272c"
Expand Down Expand Up @@ -752,6 +752,26 @@ def test_log_monitor_actor_task_name(tmp_path):
}
)

# Test the job_id is updated.
job_id = "01000000"
with open(file_info.filename, "a") as f:
# Write 150 more lines.
f.write(f"{ray_constants.LOG_PREFIX_JOB_ID}{job_id}\n")
f.write("line2")
log_monitor.check_log_files_and_publish_updates()
assert file_info.job_id == job_id
mock_publisher.publish_logs.assert_any_call(
{
"ip": log_monitor.ip,
"pid": file_info.worker_pid,
"job": file_info.job_id,
"is_err": file_info.is_err_file,
"lines": ["line2"],
"actor_name": actor_name,
"task_name": None,
}
)


@pytest.fixture
def mock_timer():
Expand Down