Skip to content

Commit

Permalink
Get served logs when remote or executor logs not available for non-ru…
Browse files Browse the repository at this point in the history
…nning task try
  • Loading branch information
kahlstrm committed Apr 24, 2024
1 parent 6db6fef commit ed0bf55
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
12 changes: 6 additions & 6 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,7 @@ def _read(
executor_messages: list[str] = []
executor_logs: list[str] = []
served_logs: list[str] = []
is_running = ti.try_number == try_number and ti.state in (
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
)
is_in_running_or_deferred = ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED)
with suppress(NotImplementedError):
remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata)
messages_list.extend(remote_messages)
Expand All @@ -384,7 +381,9 @@ def _read(
worker_log_full_path = Path(self.local_base, worker_log_rel_path)
local_messages, local_logs = self._read_from_local(worker_log_full_path)
messages_list.extend(local_messages)
if is_running and not executor_messages:
if is_in_running_or_deferred and not executor_messages and not remote_logs:
# While task instance is still running and we don't have either executor nor remote logs, look for served logs
# This is for cases when users have not setup remote logging nor shared drive for logs
served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
messages_list.extend(served_messages)
elif ti.state not in State.unfinished and not (local_logs or remote_logs):
Expand All @@ -404,11 +403,12 @@ def _read(
)
log_pos = len(logs)
messages = "".join([f"*** {x}\n" for x in messages_list])
end_of_log = ti.try_number != try_number or not is_in_running_or_deferred
if metadata and "log_pos" in metadata:
previous_chars = metadata["log_pos"]
logs = logs[previous_chars:] # Cut off previously passed log test as new tail
out_message = logs if "log_pos" in (metadata or {}) else messages + logs
return out_message, {"end_of_log": not is_running, "log_pos": log_pos}
return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}

@staticmethod
def _get_pod_namespace(ti: TaskInstance):
Expand Down
14 changes: 12 additions & 2 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc

def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance):
"""Test for executors which do not have `get_task_log` method, it fallbacks to reading
log from worker. But it happens only for the latest try_number."""
log from worker if and only if remote logs aren't found"""
executor_name = "CeleryExecutor"

ti = create_task_instance(
Expand All @@ -336,7 +336,17 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc
fth._read_from_logs_server.assert_called_once()
assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": False, "log_pos": 16})

# Previous try_number is from remote logs without reaching worker server
# Previous try_number should return served logs when remote logs aren't implemented
fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = ["served logs try_number=1"], ["this\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=1)
fth._read_from_logs_server.assert_called_once()
assert actual == (
"*** served logs try_number=1\nthis\nlog\ncontent",
{"end_of_log": True, "log_pos": 16},
)

# When remote_logs is implemented, previous try_number is from remote logs without reaching worker server
fth._read_from_logs_server.reset_mock()
fth._read_remote_logs = mock.Mock()
fth._read_remote_logs.return_value = ["remote logs"], ["remote\nlog\ncontent"]
Expand Down

0 comments on commit ed0bf55

Please sign in to comment.