Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import kubernetes_asyncio.client as async_k8s

if TYPE_CHECKING:
from pendulum import DateTime

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.sdk import Context

Expand Down Expand Up @@ -189,12 +191,24 @@ def on_operator_resuming(
pass

@staticmethod
def progress_callback(*, line: str, client: client_type, mode: str, **kwargs) -> None:
def progress_callback(
*,
line: str,
client: client_type,
mode: str,
container_name: str,
timestamp: DateTime | None,
pod: k8s.V1Pod,
**kwargs,
) -> None:
"""
Invoke this callback to process pod container logs.

:param line: the read line of log.
:param client: the Kubernetes client that can be used in the callback.
:param mode: the current execution mode, it's one of (`sync`, `async`).
:param container_name: the name of the container from which the log line was read.
:param timestamp: the timestamp of the log line.
:param pod: the pod from which the log line was read.
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,6 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
)
message_to_log = None
message_timestamp = None
progress_callback_lines = []
try:
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace")
Expand All @@ -485,35 +484,39 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
if message_to_log is None: # first line in the log
message_to_log = message
message_timestamp = line_timestamp
progress_callback_lines.append(line)
else: # previous log line is complete
for line in progress_callback_lines:
for callback in self._callbacks:
callback.progress_callback(
line=line, client=self._client, mode=ExecutionMode.SYNC
)
if message_to_log is not None:
self._log_message(
message_to_log,
container_name,
container_name_log_prefix_enabled,
log_formatter,
for callback in self._callbacks:
callback.progress_callback(
line=message_to_log,
client=self._client,
mode=ExecutionMode.SYNC,
container_name=container_name,
timestamp=message_timestamp,
pod=pod,
)
self._log_message(
message_to_log,
container_name,
container_name_log_prefix_enabled,
log_formatter,
)
last_captured_timestamp = message_timestamp
message_to_log = message
message_timestamp = line_timestamp
progress_callback_lines = [line]
else: # continuation of the previous log line
message_to_log = f"{message_to_log}\n{message}"
progress_callback_lines.append(line)
finally:
# log the last line and update the last_captured_timestamp
for line in progress_callback_lines:
if message_to_log is not None:
for callback in self._callbacks:
callback.progress_callback(
line=line, client=self._client, mode=ExecutionMode.SYNC
line=message_to_log,
client=self._client,
mode=ExecutionMode.SYNC,
container_name=container_name,
timestamp=message_timestamp,
pod=pod,
)
if message_to_log is not None:
self._log_message(
message_to_log, container_name, container_name_log_prefix_enabled, log_formatter
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,16 +551,38 @@ def test_fetch_container_logs_invoke_progress_callback(
):
MockWrapper.reset()
mock_callbacks = MockWrapper.mock_callbacks
message = "2020-10-08T14:16:17.793417674Z message"
ts = "2020-10-08T14:16:17.793417674Z"
message = "message"
no_ts_message = "notimestamp"
mock_read_pod_logs.return_value = [bytes(message, "utf-8"), bytes(no_ts_message, "utf-8")]
second_ts = "2020-10-08T15:16:18.793417674Z"
second_message = "second message"
mock_read_pod_logs.return_value = [
bytes(f"{ts} {message}", "utf-8"),
bytes(no_ts_message, "utf-8"),
bytes(f"{second_ts} {second_message}", "utf-8"),
]
mock_container_is_running.return_value = False
mock_pod = mock.MagicMock()

self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True)
self.pod_manager.fetch_container_logs(mock_pod, "base", follow=True)
mock_callbacks.progress_callback.assert_has_calls(
[
mock.call(line=message, client=self.pod_manager._client, mode="sync"),
mock.call(line=no_ts_message, client=self.pod_manager._client, mode="sync"),
mock.call(
line=f"{message}\n{no_ts_message}",
client=self.pod_manager._client,
mode="sync",
container_name="base",
timestamp=pendulum.parse(ts),
pod=mock_pod,
),
mock.call(
line=f"{second_message}",
client=self.pod_manager._client,
mode="sync",
container_name="base",
timestamp=pendulum.parse(second_ts),
pod=mock_pod,
),
]
)

Expand Down