From 0818d64658ea1b2485e80b78e4b3247a6eda33f8 Mon Sep 17 00:00:00 2001 From: John Horan Date: Fri, 12 Dec 2025 15:47:46 +0000 Subject: [PATCH 1/8] stash --- .../providers/cncf/kubernetes/callbacks.py | 7 ++++++- .../cncf/kubernetes/utils/pod_manager.py | 18 ++++++++++++------ .../cncf/kubernetes/utils/test_pod_manager.py | 14 ++++++++++---- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py index 20ea0d02a8846..96381e751088f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py @@ -21,6 +21,7 @@ import kubernetes.client as k8s import kubernetes_asyncio.client as async_k8s +from pendulum import DateTime if TYPE_CHECKING: from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator @@ -189,12 +190,16 @@ def on_operator_resuming( pass @staticmethod - def progress_callback(*, line: str, client: client_type, mode: str, **kwargs) -> None: + def progress_callback( + *, line: str, client: client_type, mode: str, container_name: str, timestamp: DateTime, **kwargs + ) -> None: """ Invoke this callback to process pod container logs. :param line: the read line of log. :param client: the Kubernetes client that can be used in the callback. :param mode: the current execution mode, it's one of (`sync`, `async`). + :param container_name: the name of the container from which the log line was read. + :param timestamp: the timestamp of the log line. """ pass diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 7fddb37bf3470..e9b73949df9a9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -474,12 +474,15 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None message_timestamp = line_timestamp progress_callback_lines.append(line) else: # previous log line is complete - for line in progress_callback_lines: + if message_to_log is not None: for callback in self._callbacks: callback.progress_callback( - line=line, client=self._client, mode=ExecutionMode.SYNC + line=message_to_log, + client=self._client, + mode=ExecutionMode.SYNC, + container_name=container_name, + timestamp=message_timestamp, ) - if message_to_log is not None: self._log_message( message_to_log, container_name, @@ -495,12 +498,15 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None progress_callback_lines.append(line) finally: # log the last line and update the last_captured_timestamp - for line in progress_callback_lines: + if message_to_log is not None: for callback in self._callbacks: callback.progress_callback( - line=line, client=self._client, mode=ExecutionMode.SYNC + line=message_to_log, + client=self._client, + mode=ExecutionMode.SYNC, + container_name=container_name, + timestamp=message_timestamp, ) - if message_to_log is not None: self._log_message( message_to_log, container_name, container_name_log_prefix_enabled, log_formatter ) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 6fb8b1e10e7dc..fe1464a6941c5 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -524,16 +524,22 @@ def test_fetch_container_logs_invoke_progress_callback( ): MockWrapper.reset() mock_callbacks = MockWrapper.mock_callbacks - message = "2020-10-08T14:16:17.793417674Z message" + ts = "2020-10-08T14:16:17.793417674Z" + message = "message" no_ts_message = "notimestamp" - mock_read_pod_logs.return_value = [bytes(message, "utf-8"), bytes(no_ts_message, "utf-8")] + mock_read_pod_logs.return_value = [bytes(f"{ts} {message}", "utf-8"), bytes(no_ts_message, "utf-8")] mock_container_is_running.return_value = False self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) mock_callbacks.progress_callback.assert_has_calls( [ - mock.call(line=message, client=self.pod_manager._client, mode="sync"), - mock.call(line=no_ts_message, client=self.pod_manager._client, mode="sync"), + mock.call( + line=f"{message}\n{no_ts_message}", + client=self.pod_manager._client, + mode="sync", + container_name=mock.ANY, + timestamp=pendulum.parse(ts), + ), ] ) From 6ebb803c93e5f9cbaa5b76ff5d2a76411a278c07 Mon Sep 17 00:00:00 2001 From: John Horan Date: Fri, 12 Dec 2025 15:55:38 +0000 Subject: [PATCH 2/8] expand test --- .../cncf/kubernetes/utils/test_pod_manager.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index fe1464a6941c5..2783ba0c22603 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -527,19 +527,28 @@ def test_fetch_container_logs_invoke_progress_callback( ts = "2020-10-08T14:16:17.793417674Z" message = "message" no_ts_message = "notimestamp" - mock_read_pod_logs.return_value = [bytes(f"{ts} {message}", "utf-8"), bytes(no_ts_message, "utf-8")] + second_ts = "2020-10-08T15:16:18.793417674Z" + second_message = "second message" + mock_read_pod_logs.return_value = [bytes(f"{ts} {message}", "utf-8"), bytes(no_ts_message, "utf-8"), bytes(f"{second_ts} {second_message}", "utf-8")] mock_container_is_running.return_value = False - self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) + self.pod_manager.fetch_container_logs(mock.MagicMock(), "base", follow=True) mock_callbacks.progress_callback.assert_has_calls( [ mock.call( line=f"{message}\n{no_ts_message}", client=self.pod_manager._client, mode="sync", - container_name=mock.ANY, + container_name="base", timestamp=pendulum.parse(ts), ), + mock.call( + line=f"{second_message}", + client=self.pod_manager._client, + mode="sync", + container_name="base", + timestamp=pendulum.parse(second_ts), + ), ] ) From cb91049299baed8c7a0946e8276a2f2763bf463a Mon Sep 17 00:00:00 2001 From: John Horan Date: Fri, 12 Dec 2025 16:25:29 +0000 Subject: [PATCH 3/8] fix mypy --- .../src/airflow/providers/cncf/kubernetes/callbacks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py index 96381e751088f..5218f39e3815f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py @@ -191,7 +191,7 @@ def on_operator_resuming( @staticmethod def progress_callback( - *, line: str, client: client_type, mode: str, container_name: str, timestamp: DateTime, **kwargs + *, line: str, client: client_type, mode: str, container_name: str, timestamp: DateTime | None, **kwargs ) -> None: """ Invoke this callback to process pod container logs. From 5889944b706c423adf30bc57e003619b50beb0cc Mon Sep 17 00:00:00 2001 From: John Horan Date: Mon, 15 Dec 2025 10:54:42 +0000 Subject: [PATCH 4/8] ruff fixes --- .../airflow/providers/cncf/kubernetes/callbacks.py | 11 +++++++++-- .../unit/cncf/kubernetes/utils/test_pod_manager.py | 6 +++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py index 5218f39e3815f..928f127617f9e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py @@ -21,9 +21,10 @@ import kubernetes.client as k8s import kubernetes_asyncio.client as async_k8s -from pendulum import DateTime if TYPE_CHECKING: + from pendulum import DateTime + from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.utils.context import Context @@ -191,7 +192,13 @@ def on_operator_resuming( @staticmethod def progress_callback( - *, line: str, client: client_type, mode: str, container_name: str, timestamp: DateTime | None, **kwargs + *, + line: str, + client: client_type, + mode: str, + container_name: str, + timestamp: DateTime | None, + **kwargs, ) -> None: """ Invoke this callback to process pod container logs. diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 2783ba0c22603..c8551aeb200fa 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -529,7 +529,11 @@ def test_fetch_container_logs_invoke_progress_callback( no_ts_message = "notimestamp" second_ts = "2020-10-08T15:16:18.793417674Z" second_message = "second message" - mock_read_pod_logs.return_value = [bytes(f"{ts} {message}", "utf-8"), bytes(no_ts_message, "utf-8"), bytes(f"{second_ts} {second_message}", "utf-8")] + mock_read_pod_logs.return_value = [ + bytes(f"{ts} {message}", "utf-8"), + bytes(no_ts_message, "utf-8"), + bytes(f"{second_ts} {second_message}", "utf-8"), + ] mock_container_is_running.return_value = False self.pod_manager.fetch_container_logs(mock.MagicMock(), "base", follow=True) From 02a20107a1d352a617ee8f67f96ad6310784c4af Mon Sep 17 00:00:00 2001 From: John Horan Date: Tue, 23 Dec 2025 10:07:15 +0000 Subject: [PATCH 5/8] remove redundant if --- .../cncf/kubernetes/utils/pod_manager.py | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index a082beb1bc0ac..0d5225b972ffa 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -487,21 +487,20 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None message_timestamp = line_timestamp progress_callback_lines.append(line) else: # previous log line is complete - if message_to_log is not None: - for callback in self._callbacks: - callback.progress_callback( - line=message_to_log, - client=self._client, - mode=ExecutionMode.SYNC, - container_name=container_name, - timestamp=message_timestamp, - ) - self._log_message( - message_to_log, - container_name, - container_name_log_prefix_enabled, - log_formatter, + for callback in self._callbacks: + callback.progress_callback( + line=message_to_log, + client=self._client, + mode=ExecutionMode.SYNC, + container_name=container_name, + timestamp=message_timestamp, ) + self._log_message( + message_to_log, + container_name, + container_name_log_prefix_enabled, + log_formatter, + ) last_captured_timestamp = message_timestamp message_to_log = message message_timestamp = line_timestamp From 953ae416124bbf6bc721fcffdc3d6cb636532ce8 Mon Sep 17 00:00:00 2001 From: John Horan Date: Tue, 23 Dec 2025 10:14:43 +0000 Subject: [PATCH 6/8] add pod to callback --- .../src/airflow/providers/cncf/kubernetes/callbacks.py | 2 ++ .../src/airflow/providers/cncf/kubernetes/utils/pod_manager.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py index 928f127617f9e..b5fe13aa771e0 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py @@ -198,6 +198,7 @@ def progress_callback( mode: str, container_name: str, timestamp: DateTime | None, + pod: k8s.V1Pod, **kwargs, ) -> None: """ @@ -208,5 +209,6 @@ def progress_callback( :param mode: the current execution mode, it's one of (`sync`, `async`). :param container_name: the name of the container from which the log line was read. :param timestamp: the timestamp of the log line. + :param pod: the pod from which the log line was read. """ pass diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 0d5225b972ffa..866ba37bc41eb 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -494,6 +494,7 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None mode=ExecutionMode.SYNC, container_name=container_name, timestamp=message_timestamp, + pod=pod, ) self._log_message( message_to_log, @@ -518,6 +519,7 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None mode=ExecutionMode.SYNC, container_name=container_name, timestamp=message_timestamp, + pod=pod, ) self._log_message( message_to_log, container_name, container_name_log_prefix_enabled, log_formatter From e2b5d274092cfeefb91899cea8fa1d5a82afc58e Mon Sep 17 00:00:00 2001 From: John Horan Date: Tue, 23 Dec 2025 10:24:30 +0000 Subject: [PATCH 7/8] add pod to tests --- .../tests/unit/cncf/kubernetes/utils/test_pod_manager.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index c046ab3da5960..fe0dfbc758036 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -562,8 +562,9 @@ def test_fetch_container_logs_invoke_progress_callback( bytes(f"{second_ts} {second_message}", "utf-8"), ] mock_container_is_running.return_value = False + mock_pod = mock.MagicMock() - self.pod_manager.fetch_container_logs(mock.MagicMock(), "base", follow=True) + self.pod_manager.fetch_container_logs(mock_pod, "base", follow=True) mock_callbacks.progress_callback.assert_has_calls( [ mock.call( @@ -572,6 +573,7 @@ def test_fetch_container_logs_invoke_progress_callback( mode="sync", container_name="base", timestamp=pendulum.parse(ts), + pod=mock_pod, ), mock.call( line=f"{second_message}", @@ -579,6 +581,7 @@ def test_fetch_container_logs_invoke_progress_callback( mode="sync", container_name="base", timestamp=pendulum.parse(second_ts), + pod=mock_pod, ), ] ) From a6050981528e13c069f5dcd8cb14e912d9c64b98 Mon Sep 17 00:00:00 2001 From: John Horan Date: Fri, 2 Jan 2026 10:49:43 +0000 Subject: [PATCH 8/8] remove redundant code --- .../airflow/providers/cncf/kubernetes/utils/pod_manager.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 40f1e16218051..afcf7a2ae53bc 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -476,7 +476,6 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None ) message_to_log = None message_timestamp = None - progress_callback_lines = [] try: for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace") @@ -485,7 +484,6 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None if message_to_log is None: # first line in the log message_to_log = message message_timestamp = line_timestamp - progress_callback_lines.append(line) else: # previous log line is complete for callback in self._callbacks: callback.progress_callback( @@ -505,10 +503,8 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None last_captured_timestamp = message_timestamp message_to_log = message message_timestamp = line_timestamp - progress_callback_lines = [line] else: # continuation of the previous log line message_to_log = f"{message_to_log}\n{message}" - progress_callback_lines.append(line) finally: # log the last line and update the last_captured_timestamp if message_to_log is not None: