diff --git a/python/ray/_private/log_monitor.py b/python/ray/_private/log_monitor.py index e02105a90dd4b..7f06343625aef 100644 --- a/python/ray/_private/log_monitor.py +++ b/python/ray/_private/log_monitor.py @@ -24,7 +24,7 @@ logger = logging.getLogger(__name__) # The groups are job id, and pid. -JOB_LOG_PATTERN = re.compile(".*worker.*-([0-9a-f]+)-(\d+)") +WORKER_LOG_PATTERN = re.compile(".*worker.*-([0-9a-f]+)-(\d+)") # The groups are job id. RUNTIME_ENV_SETUP_PATTERN = re.compile(".*runtime_env_setup-(\d+).log") # Log name update interval under pressure. @@ -218,13 +218,12 @@ def update_log_filenames(self): + runtime_env_setup_paths ): if os.path.isfile(file_path) and file_path not in self.log_filenames: - job_match = JOB_LOG_PATTERN.match(file_path) - if job_match: - job_id = job_match.group(1) - worker_pid = int(job_match.group(2)) + worker_match = WORKER_LOG_PATTERN.match(file_path) + if worker_match: + worker_pid = int(worker_match.group(2)) else: - job_id = None worker_pid = None + job_id = None # Perform existence check first because most file will not be # including runtime_env. This saves some cpu cycle. @@ -365,6 +364,10 @@ def flush(): file_info.task_name = next_line.split( ray_constants.LOG_PREFIX_TASK_NAME, 1 )[1] + elif next_line.startswith(ray_constants.LOG_PREFIX_JOB_ID): + file_info.job_id = next_line.split( + ray_constants.LOG_PREFIX_JOB_ID, 1 + )[1] elif next_line.startswith( "Windows fatal exception: access violation" ): diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index fa9bee4a68a5c..6badbbf2b29ae 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -254,6 +254,8 @@ def env_bool(key, default): LOG_PREFIX_ACTOR_NAME = ":actor_name:" # Task names are recorded in the logs with this magic token as a prefix. LOG_PREFIX_TASK_NAME = ":task_name:" +# Job ids are recorded in the logs with this magic token as a prefix. +LOG_PREFIX_JOB_ID = ":job_id:" # The object metadata field uses the following format: It is a comma # separated list of fields. The first field is mandatory and is the diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 55625dd18f872..1e2dccb927bb2 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1411,6 +1411,14 @@ def maybe_initialize_job_config(): sys.path.insert(0, p) job_config_initialized = True + # Record the task name via :task_name: magic token in the log file. + # This is used for the prefix in driver logs `(task_name pid=123) ...` + job_id_magic_token = "{}{}\n".format( + ray_constants.LOG_PREFIX_JOB_ID, core_worker.get_current_job_id().hex()) + # Print on both .out and .err + print(job_id_magic_token, end="") + print(job_id_magic_token, file=sys.stderr, end="") + # This function introduces ~2-7us of overhead per call (i.e., it can be called # up to hundreds of thousands of times per second). diff --git a/python/ray/tests/test_logging.py b/python/ray/tests/test_logging.py index 0cd426a244f70..fe5a33e47a587 100644 --- a/python/ray/tests/test_logging.py +++ b/python/ray/tests/test_logging.py @@ -610,8 +610,8 @@ def get_file_info(file_infos, filename): assert not worker_out_log_file_info.is_err_file assert worker_err_log_file_info.is_err_file - assert worker_out_log_file_info.job_id == job_id - assert worker_err_log_file_info.job_id == job_id + assert worker_out_log_file_info.job_id is None + assert worker_err_log_file_info.job_id is None assert worker_out_log_file_info.worker_pid == int(dead_pid) assert worker_out_log_file_info.worker_pid == int(dead_pid) @@ -691,7 +691,7 @@ def get_file_info(file_infos, filename): assert len(list((log_dir / "old").iterdir())) == 2 -def test_log_monitor_actor_task_name(tmp_path): +def test_log_monitor_actor_task_name_and_job_id(tmp_path): log_dir = tmp_path / "logs" log_dir.mkdir() worker_id = "6df6d5dd8ca5215658e4a8f9a569a9d98e27094f9cc35a4ca43d272c" @@ -752,6 +752,26 @@ def test_log_monitor_actor_task_name(tmp_path): } ) + # Test the job_id is updated. + job_id = "01000000" + with open(file_info.filename, "a") as f: + # Write 150 more lines. + f.write(f"{ray_constants.LOG_PREFIX_JOB_ID}{job_id}\n") + f.write("line2") + log_monitor.check_log_files_and_publish_updates() + assert file_info.job_id == job_id + mock_publisher.publish_logs.assert_any_call( + { + "ip": log_monitor.ip, + "pid": file_info.worker_pid, + "job": file_info.job_id, + "is_err": file_info.is_err_file, + "lines": ["line2"], + "actor_name": actor_name, + "task_name": None, + } + ) + @pytest.fixture def mock_timer():