Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,9 @@ async def delete_pod(self, name: str, namespace: str):
if str(e.status) != "404":
raise

async def read_logs(self, name: str, namespace: str):
async def read_logs(
self, name: str, namespace: str, container_name: str | None = None, since_seconds: int | None = None
) -> list[str]:
"""
Read logs inside the pod while starting containers inside.

Expand All @@ -921,23 +923,24 @@ async def read_logs(self, name: str, namespace: str):

:param name: Name of the pod.
:param namespace: Name of the pod's namespace.
:param container_name: Name of the container inside the pod.
:param since_seconds: Only return logs newer than a relative duration in seconds.
"""
async with self.get_conn() as connection:
try:
v1_api = async_client.CoreV1Api(connection)
logs = await v1_api.read_namespaced_pod_log(
name=name,
namespace=namespace,
container_name=container_name,
follow=False,
timestamps=True,
since_seconds=since_seconds,
)
logs = logs.splitlines()
for line in logs:
self.log.info("Container logs from %s", line)
return logs
except HTTPError:
self.log.exception("There was an error reading the kubernetes API.")
raise
except HTTPError as e:
raise KubernetesApiError from e

async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList:
"""Get pod's events."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,17 +899,6 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
if not self.pod:
raise PodNotFoundException("Could not find pod after resuming from deferral")

if event["status"] != "running":
for callback in self.callbacks:
callback.on_operator_resuming(
pod=self.pod,
event=event,
client=self.client,
mode=ExecutionMode.SYNC,
context=context,
operator=self,
)

follow = self.logging_interval is None
last_log_time = event.get("last_log_time")

Expand Down Expand Up @@ -942,34 +931,12 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
)
message = event.get("stack_trace", event["message"])
raise AirflowException(message)

return xcom_sidecar_output

if event["status"] == "running":
if self.get_logs:
self.log.info("Resuming logs read from time %r", last_log_time)

pod_log_status = self.pod_manager.fetch_container_logs(
pod=self.pod,
container_name=self.base_container_name,
follow=follow,
since_time=last_log_time,
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
log_formatter=self.log_formatter,
)

self.invoke_defer_method(pod_log_status.last_log_time)
else:
self.invoke_defer_method()
except TaskDeferred:
raise
finally:
self._clean(event=event, context=context, result=xcom_sidecar_output)

def _clean(self, event: dict[str, Any], result: dict | None, context: Context) -> None:
if event["status"] == "running":
return

if self.pod is None:
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,14 @@ async def _wait_for_container_completion(self) -> TriggerEvent:
}
)
self.log.debug("Container is not completed and still working.")
if time_get_more_logs and datetime.datetime.now(tz=datetime.timezone.utc) > time_get_more_logs:
return TriggerEvent(
{
"status": "running",
"last_log_time": self.last_log_time,
"namespace": self.pod_namespace,
"name": self.pod_name,
**self.trigger_kwargs,
}
)
now = datetime.datetime.now(tz=datetime.timezone.utc)
if time_get_more_logs and now >= time_get_more_logs:
if self.get_logs and self.logging_interval:
self.last_log_time = await self.pod_manager.fetch_container_logs_before_current_sec(
pod, container_name=self.base_container_name, since_time=self.last_log_time
)
time_get_more_logs = now + datetime.timedelta(seconds=self.logging_interval)

self.log.debug("Sleeping for %s seconds.", self.poll_interval)
await asyncio.sleep(self.poll_interval)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
try:
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace")
line_timestamp, message = self.parse_log_line(line)
line_timestamp, message = parse_log_line(line)
if line_timestamp: # detect new log line
if message_to_log is None: # first line in the log
message_to_log = message
Expand Down Expand Up @@ -708,22 +708,6 @@ def await_pod_completion(
time.sleep(2)
return remote_pod

def parse_log_line(self, line: str) -> tuple[DateTime | None, str]:
"""
Parse K8s log line and returns the final state.

:param line: k8s log line
:return: timestamp and log message
"""
timestamp, sep, message = line.strip().partition(" ")
if not sep:
return None, line
try:
last_log_time = cast("DateTime", pendulum.parse(timestamp))
except ParserError:
return None, line
return last_log_time, message

def container_is_running(self, pod: V1Pod, container_name: str) -> bool:
"""Read pod and checks if container is running."""
remote_pod = self.read_pod(pod)
Expand Down Expand Up @@ -971,6 +955,23 @@ def is_log_group_marker(line: str) -> bool:
return line.startswith("::group::") or line.startswith("::endgroup::")


def parse_log_line(line: str) -> tuple[DateTime | None, str]:
"""
Parse K8s log line and returns the final state.

:param line: k8s log line
:return: timestamp and log message
"""
timestamp, sep, message = line.strip().partition(" ")
if not sep:
return None, line
try:
last_log_time = cast("DateTime", pendulum.parse(timestamp))
except ParserError:
return None, line
return last_log_time, message


class AsyncPodManager(LoggingMixin):
"""Create, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodTriggerer."""

Expand Down Expand Up @@ -1032,3 +1033,54 @@ async def await_pod_start(
startup_timeout=startup_timeout,
check_interval=check_interval,
)

@tenacity.retry(stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(), reraise=True)
async def fetch_container_logs_before_current_sec(
self, pod: V1Pod, container_name: str, since_time: DateTime | None = None
) -> DateTime | None:
"""
Asynchronously read the log file of the specified pod.

This method streams logs from the base container, skipping log lines from the current second to prevent duplicate entries on subsequent reads. It is designed to handle long-running containers and gracefully suppresses transient interruptions.

:param pod: The pod specification to monitor.
:param container_name: The name of the container within the pod.
:param since_time: The timestamp from which to start reading logs.
:return: The timestamp to use for the next log read, representing the start of the current second. Returns None if an exception occurred.
"""
now = pendulum.now()
logs = await self._hook.read_logs(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
container_name=container_name,
since_seconds=(math.ceil((now - since_time).total_seconds()) if since_time else None),
)
message_to_log = None
try:
now_seconds = now.replace(microsecond=0)
for line in logs:
line_timestamp, message = parse_log_line(line)
# Skip log lines from the current second to prevent duplicate entries on the next read.
# The API only allows specifying 'since_seconds', not an exact timestamp.
if line_timestamp and line_timestamp.replace(microsecond=0) == now_seconds:
break
if line_timestamp: # detect new log line
if message_to_log is None: # first line in the log
message_to_log = message
else: # previous log line is complete
if message_to_log is not None:
if is_log_group_marker(message_to_log):
print(message_to_log)
else:
self.log.info("[%s] %s", container_name, message_to_log)
message_to_log = message
elif message_to_log: # continuation of the previous log line
message_to_log = f"{message_to_log}\n{message}"
finally:
# log the last line and update the last_captured_timestamp
if message_to_log is not None:
if is_log_group_marker(message_to_log):
print(message_to_log)
else:
self.log.info("[%s] %s", container_name, message_to_log)
return now # Return the current time as the last log time to ensure logs from the current second are read in the next fetch.
Original file line number Diff line number Diff line change
Expand Up @@ -1060,25 +1060,25 @@ async def test_read_logs(self, lib_method, kube_config_loader):
config_file=None,
cluster_context=None,
)
with mock.patch(
"airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.log",
new_callable=PropertyMock,
) as log:
await hook.read_logs(
name=POD_NAME,
namespace=NAMESPACE,
)

lib_method.assert_called_once()
lib_method.assert_called_with(
name=POD_NAME,
namespace=NAMESPACE,
follow=False,
timestamps=True,
)
log.return_value.info.assert_called_with(
"Container logs from %s", "2023-01-11 Some string logs..."
)
logs = await hook.read_logs(
name=POD_NAME,
namespace=NAMESPACE,
container_name=CONTAINER_NAME,
since_seconds=10,
)

lib_method.assert_called_once()
lib_method.assert_called_with(
name=POD_NAME,
namespace=NAMESPACE,
container_name=CONTAINER_NAME,
follow=False,
timestamps=True,
since_seconds=10,
)
assert len(logs) == 1
assert "2023-01-11 Some string logs..." in logs

@pytest.mark.asyncio
@mock.patch(KUBE_BATCH_API.format("read_namespaced_job_status"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2166,18 +2166,6 @@ def test_process_duplicate_label_pods__pod_removed_if_delete_pod(
process_pod_deletion_mock.assert_called_once_with(pod_1)
assert result.metadata.name == pod_2.metadata.name

@patch(POD_MANAGER_CLASS.format("fetch_container_logs"))
@patch(KUB_OP_PATH.format("invoke_defer_method"))
def test_defere_call_one_more_time_after_error(self, invoke_defer_method, fetch_container_logs):
fetch_container_logs.return_value = PodLoggingStatus(False, None)
op = KubernetesPodOperator(task_id="test_task", name="test-pod", get_logs=True)

op.trigger_reentry(
create_context(op), event={"name": TEST_NAME, "namespace": TEST_NAMESPACE, "status": "running"}
)

invoke_defer_method.assert_called_with(None)


class TestSuppress:
def test__suppress(self, caplog):
Expand Down Expand Up @@ -2606,32 +2594,6 @@ def test_cleanup_log_pod_spec_on_failure(self, log_pod_spec_on_failure, expect_m
with pytest.raises(AirflowException, match=expect_match):
k.cleanup(pod, pod)

@patch(
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.convert_config_file_to_dict"
)
@patch(f"{HOOK_CLASS}.get_pod")
@patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
@patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
def test_get_logs_running(
self,
fetch_container_logs,
await_pod_completion,
get_pod,
mock_convert_config_file_to_dict,
):
"""When logs fetch exits with status running, raise task deferred"""
pod = MagicMock()
get_pod.return_value = pod
op = KubernetesPodOperator(task_id="test_task", name="test-pod", get_logs=True)
await_pod_completion.return_value = None
fetch_container_logs.return_value = PodLoggingStatus(True, None)
with pytest.raises(TaskDeferred):
op.trigger_reentry(
create_context(op),
event={"name": TEST_NAME, "namespace": TEST_NAMESPACE, "status": "running"},
)
fetch_container_logs.is_called_with(pod, "base")

@patch(KUB_OP_PATH.format("_write_logs"))
@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup")
@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod")
Expand Down Expand Up @@ -2702,17 +2664,6 @@ def test_execute_async_callbacks(self, mocked_hook):
}
k.trigger_reentry(context=context, event=callback_event)

# check on_operator_resuming callback
mock_callbacks.on_operator_resuming.assert_called_once()
assert mock_callbacks.on_operator_resuming.call_args.kwargs == {
"client": k.client,
"mode": ExecutionMode.SYNC,
"pod": remote_pod_mock,
"operator": k,
"context": context,
"event": callback_event,
}

# check on_pod_cleanup callback
mock_callbacks.on_pod_cleanup.assert_called_once()
assert mock_callbacks.on_pod_cleanup.call_args.kwargs == {
Expand Down
Loading