diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py index e3ff825b9d10f..f78e83fabd4de 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py @@ -23,6 +23,8 @@ import kubernetes_asyncio.client as async_k8s if TYPE_CHECKING: + from pendulum import DateTime + from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.sdk import Context @@ -189,12 +191,24 @@ def on_operator_resuming( pass @staticmethod - def progress_callback(*, line: str, client: client_type, mode: str, **kwargs) -> None: + def progress_callback( + *, + line: str, + client: client_type, + mode: str, + container_name: str, + timestamp: DateTime | None, + pod: k8s.V1Pod, + **kwargs, + ) -> None: """ Invoke this callback to process pod container logs. :param line: the read line of log. :param client: the Kubernetes client that can be used in the callback. :param mode: the current execution mode, it's one of (`sync`, `async`). + :param container_name: the name of the container from which the log line was read. + :param timestamp: the timestamp of the log line. + :param pod: the pod from which the log line was read. """ pass diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index c9b48158d960a..afcf7a2ae53bc 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -476,7 +476,6 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None ) message_to_log = None message_timestamp = None - progress_callback_lines = [] try: for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace") @@ -485,35 +484,39 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None if message_to_log is None: # first line in the log message_to_log = message message_timestamp = line_timestamp - progress_callback_lines.append(line) else: # previous log line is complete - for line in progress_callback_lines: - for callback in self._callbacks: - callback.progress_callback( - line=line, client=self._client, mode=ExecutionMode.SYNC - ) - if message_to_log is not None: - self._log_message( - message_to_log, - container_name, - container_name_log_prefix_enabled, - log_formatter, + for callback in self._callbacks: + callback.progress_callback( + line=message_to_log, + client=self._client, + mode=ExecutionMode.SYNC, + container_name=container_name, + timestamp=message_timestamp, + pod=pod, ) + self._log_message( + message_to_log, + container_name, + container_name_log_prefix_enabled, + log_formatter, + ) last_captured_timestamp = message_timestamp message_to_log = message message_timestamp = line_timestamp - progress_callback_lines = [line] else: # continuation of the previous log line message_to_log = f"{message_to_log}\n{message}" - progress_callback_lines.append(line) finally: # log the last line and update the last_captured_timestamp - for line in progress_callback_lines: + if message_to_log is not None: for callback in self._callbacks: callback.progress_callback( - line=line, client=self._client, mode=ExecutionMode.SYNC + line=message_to_log, + client=self._client, + mode=ExecutionMode.SYNC, + container_name=container_name, + timestamp=message_timestamp, + pod=pod, ) - if message_to_log is not None: self._log_message( message_to_log, container_name, container_name_log_prefix_enabled, log_formatter ) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 062266540004b..fe0dfbc758036 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -551,16 +551,38 @@ def test_fetch_container_logs_invoke_progress_callback( ): MockWrapper.reset() mock_callbacks = MockWrapper.mock_callbacks - message = "2020-10-08T14:16:17.793417674Z message" + ts = "2020-10-08T14:16:17.793417674Z" + message = "message" no_ts_message = "notimestamp" - mock_read_pod_logs.return_value = [bytes(message, "utf-8"), bytes(no_ts_message, "utf-8")] + second_ts = "2020-10-08T15:16:18.793417674Z" + second_message = "second message" + mock_read_pod_logs.return_value = [ + bytes(f"{ts} {message}", "utf-8"), + bytes(no_ts_message, "utf-8"), + bytes(f"{second_ts} {second_message}", "utf-8"), + ] mock_container_is_running.return_value = False + mock_pod = mock.MagicMock() - self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) + self.pod_manager.fetch_container_logs(mock_pod, "base", follow=True) mock_callbacks.progress_callback.assert_has_calls( [ - mock.call(line=message, client=self.pod_manager._client, mode="sync"), - mock.call(line=no_ts_message, client=self.pod_manager._client, mode="sync"), + mock.call( + line=f"{message}\n{no_ts_message}", + client=self.pod_manager._client, + mode="sync", + container_name="base", + timestamp=pendulum.parse(ts), + pod=mock_pod, + ), + mock.call( + line=f"{second_message}", + client=self.pod_manager._client, + mode="sync", + container_name="base", + timestamp=pendulum.parse(second_ts), + pod=mock_pod, + ), ] )