diff --git a/airflow-core/src/airflow/utils/log/log_reader.py b/airflow-core/src/airflow/utils/log/log_reader.py index d50b8fc126021..c00a6877511d0 100644 --- a/airflow-core/src/airflow/utils/log/log_reader.py +++ b/airflow-core/src/airflow/utils/log/log_reader.py @@ -44,6 +44,8 @@ class TaskLogReader: STREAM_LOOP_SLEEP_SECONDS = 1 """Time to sleep between loops while waiting for more logs""" + STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 5 + """Number of empty loop iterations before stopping the stream""" def read_log_chunks( self, ti: TaskInstance, try_number: int | None, metadata @@ -81,6 +83,7 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di for key in ("end_of_log", "max_offset", "offset", "log_pos"): metadata.pop(key, None) + empty_iterations = 0 while True: logs, out_metadata = self.read_log_chunks(ti, try_number, metadata) @@ -96,14 +99,21 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED, ): - if not logs[0]: + if logs: + empty_iterations = 0 + else: # we did not receive any logs in this loop # sleeping to conserve resources / limit requests on external services time.sleep(self.STREAM_LOOP_SLEEP_SECONDS) + empty_iterations += 1 + if empty_iterations >= self.STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS: + # we have not received any logs for a while, so we stop the stream + yield "(Log stream stopped - End of log marker not found; logs may be incomplete.)\n" + return else: metadata.clear() metadata.update(out_metadata) - break + return @cached_property def log_handler(self): diff --git a/airflow-core/tests/unit/utils/log/test_log_reader.py b/airflow-core/tests/unit/utils/log/test_log_reader.py index 150425eb5d159..2e330773f8292 100644 --- a/airflow-core/tests/unit/utils/log/test_log_reader.py +++ b/airflow-core/tests/unit/utils/log/test_log_reader.py @@ -218,6 +218,27 @@ def test_read_log_stream_should_read_each_try_in_turn(self, mock_read): any_order=False, ) + @mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") + def test_read_log_stream_no_end_of_log_marker(self, mock_read): + mock_read.side_effect = [ + (["hello"], {"end_of_log": False}), + ([], {"end_of_log": False}), + ([], {"end_of_log": False}), + ([], {"end_of_log": False}), + ([], {"end_of_log": False}), + ([], {"end_of_log": False}), + ] + + self.ti.state = TaskInstanceState.SUCCESS + task_log_reader = TaskLogReader() + task_log_reader.STREAM_LOOP_SLEEP_SECONDS = 0.001 # to speed up the test + log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={}) + assert list(log_stream) == [ + "hello\n", + "(Log stream stopped - End of log marker not found; logs may be incomplete.)\n", + ] + assert mock_read.call_count == 6 + def test_supports_external_link(self): task_log_reader = TaskLogReader()