Skip to content

Commit

Permalink
Revert back to the logic before apache#32561
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed Aug 5, 2024
1 parent 60d4848 commit 7d12b92
Showing 1 changed file with 4 additions and 9 deletions.
13 changes: 4 additions & 9 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,28 +381,23 @@ def _read(
executor_messages: list[str] = []
executor_logs: list[str] = []
served_logs: list[str] = []
is_in_running_or_deferred = ti.state in (
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
)
is_up_for_retry = ti.state == TaskInstanceState.UP_FOR_RETRY
with suppress(NotImplementedError):
remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata)
messages_list.extend(remote_messages)
has_k8s_exec_pod = False
if ti.state == TaskInstanceState.RUNNING:
response = self._executor_get_task_log(ti, try_number)
if response:
executor_messages, executor_logs = response
if executor_messages:
messages_list.extend(executor_messages)
has_k8s_exec_pod = True
if not (remote_logs and ti.state not in State.unfinished):
# when finished, if we have remote logs, no need to check local
worker_log_full_path = Path(self.local_base, worker_log_rel_path)
local_messages, local_logs = self._read_from_local(worker_log_full_path)
messages_list.extend(local_messages)
if is_in_running_or_deferred or is_up_for_retry or not (executor_messages or remote_logs):
# While task instance is still running or deferred, look for served logs.
# And even if it's in any state, if there are no logs found yet, check served logs.
if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not has_k8s_exec_pod:
served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
messages_list.extend(served_messages)
elif ti.state not in State.unfinished and not (local_logs or remote_logs):
Expand All @@ -422,7 +417,7 @@ def _read(
)
log_pos = len(logs)
messages = "".join([f"*** {x}\n" for x in messages_list])
end_of_log = ti.try_number != try_number or not is_in_running_or_deferred
end_of_log = ti.try_number != try_number or ti.state not in (State.RUNNING, State.DEFERRED)
if metadata and "log_pos" in metadata:
previous_chars = metadata["log_pos"]
logs = logs[previous_chars:] # Cut off previously passed log test as new tail
Expand Down

0 comments on commit 7d12b92

Please sign in to comment.