From 5ea03ab306f5add0885ebc955681c4b81450fc21 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Tue, 1 Jul 2025 02:24:11 -0600 Subject: [PATCH] Allow more empty loops before stopping log streaming (#52614) In #50715 we starting short-circuiting if we hit 5 iterations of no new log messages. This works well, except in the scenario where there are no log messages at all. ES log handler has it's own short-circuit for that scenario, but it triggers based on time and that works out to ~7 iterations. Let's let ES have the first crack at it so the user gets a better message. Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com> (cherry picked from commit 97bbf3ba031ebea162d649157cd8c5c1adcdb12e) --- airflow/utils/log/log_reader.py | 2 +- tests/utils/log/test_log_reader.py | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index d863999617810..c9573122a25fd 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -39,7 +39,7 @@ class TaskLogReader: STREAM_LOOP_SLEEP_SECONDS = 1 """Time to sleep between loops while waiting for more logs""" - STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 5 + STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 10 """Number of empty loop iterations before stopping the stream""" def read_log_chunks( diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index 598ed8bcbba35..2463db11a5bcd 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -229,11 +229,7 @@ def test_read_log_stream_should_read_each_try_in_turn(self, mock_read): def test_read_log_stream_no_end_of_log_marker(self, mock_read): mock_read.side_effect = [ ([[("", "hello")]], [{"end_of_log": False}]), - ([[]], [{"end_of_log": False}]), - ([[]], [{"end_of_log": False}]), - ([[]], [{"end_of_log": False}]), - ([[]], [{"end_of_log": False}]), - ([[]], [{"end_of_log": False}]), + *[([[]], [{"end_of_log": False}]) for _ in range(10)], ] self.ti.state = TaskInstanceState.SUCCESS @@ -244,7 +240,7 @@ def test_read_log_stream_no_end_of_log_marker(self, mock_read): "\nhello\n", "\n(Log stream stopped - End of log marker not found; logs may be incomplete.)\n", ] - assert mock_read.call_count == 6 + assert mock_read.call_count == 11 def test_supports_external_link(self): task_log_reader = TaskLogReader()