From 2f03dab95d2591b9e4463fe4726d708b6492989a Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 1 Aug 2025 15:25:19 +0800 Subject: [PATCH 01/12] Take over ash's fix --- .../airflow/utils/log/file_task_handler.py | 12 ++-- .../amazon/aws/log/cloudwatch_task_handler.py | 60 +++++++++++++------ 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 44ccf2cde3803..ba9b0f0303162 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -30,7 +30,7 @@ from itertools import chain, islice from pathlib import Path from types import GeneratorType -from typing import IO, TYPE_CHECKING, TypedDict, cast +from typing import IO, TYPE_CHECKING, Any, TypedDict, cast from urllib.parse import urljoin import pendulum @@ -71,7 +71,7 @@ """The legacy format of log messages before 3.0.4""" LogSourceInfo: TypeAlias = list[str] """Information _about_ the log fetching process for display to a user""" -RawLogStream: TypeAlias = Generator[str, None, None] +RawLogStream: TypeAlias = Generator[str | dict[str, Any], None, None] """Raw log stream, containing unparsed log lines.""" LogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None] """Legacy log response, containing source information and log messages.""" @@ -196,7 +196,8 @@ def _fetch_logs_from_service(url: str, log_relative_path: str) -> Response: if not _parse_timestamp: def _parse_timestamp(line: str): - timestamp_str, _ = line.split(" ", 1) + # Make this resilient to all input types, ensure it's always a string. + timestamp_str, _ = str(line).split(" ", 1) return pendulum.parse(timestamp_str.strip("[]")) @@ -261,7 +262,10 @@ def _log_stream_to_parsed_log_stream( for line in log_stream: if line: try: - log = StructuredLogMessage.model_validate_json(line) + if isinstance(line, dict): + log = StructuredLogMessage.model_validate(line) + else: + log = StructuredLogMessage.model_validate_json(line) except ValidationError: with suppress(Exception): # If we can't parse the timestamp, don't attach one to the row diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 79d56e7b6ad39..a62795bf5891c 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -41,7 +41,13 @@ from airflow.models.taskinstance import TaskInstance from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI - from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo + from airflow.utils.log.file_task_handler import ( + LegacyLogResponse, + LogMessages, + LogResponse, + LogSourceInfo, + RawLogStream, + ) def json_serialize_legacy(value: Any) -> str | None: @@ -163,15 +169,25 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI): self.close() return - def read(self, relative_path, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages | None]: - logs: LogMessages | None = [] + def read(self, relative_path, ti: RuntimeTI) -> LegacyLogResponse: + messages, logs = self.stream(relative_path, ti) + + return messages, [ + json.dumps(msg) if isinstance(msg, dict) else msg for group in logs for msg in group + ] + + def stream(self, relative_path, ti: RuntimeTI) -> LogResponse: + logs: list[RawLogStream] = [] messages = [ f"Reading remote log from Cloudwatch log_group: {self.log_group} log_stream: {relative_path}" ] try: - logs = [self.get_cloudwatch_logs(relative_path, ti)] + gen: RawLogStream = ( + self._parse_cloudwatch_log_event(event) + for event in self.get_cloudwatch_logs(relative_path, ti) + ) + logs = [gen] except Exception as e: - logs = None messages.append(str(e)) return messages, logs @@ -192,15 +208,14 @@ def get_cloudwatch_logs(self, stream_name: str, task_instance: RuntimeTI): if (end_date := getattr(task_instance, "end_date", None)) is None else datetime_to_epoch_utc_ms(end_date + timedelta(seconds=30)) ) - events = self.hook.get_log_events( + return self.hook.get_log_events( log_group=self.log_group, log_stream_name=stream_name, end_time=end_time, ) - return "\n".join(self._event_to_str(event) for event in events) - def _event_to_dict(self, event: dict) -> dict: - event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc).isoformat() + def _parse_cloudwatch_log_event(self, event: dict) -> dict: + event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc) message = event["message"] try: message = json.loads(message) @@ -209,13 +224,6 @@ def _event_to_dict(self, event: dict) -> dict: except Exception: return {"timestamp": event_dt, "event": message} - def _event_to_str(self, event: dict) -> str: - event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc) - # Format a datetime object to a string in Zulu time without milliseconds. - formatted_event_dt = event_dt.strftime("%Y-%m-%dT%H:%M:%SZ") - message = event["message"] - return f"[{formatted_event_dt}] {message}" - class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin): """ @@ -291,4 +299,22 @@ def _read_remote_logs( ) -> tuple[LogSourceInfo, LogMessages]: stream_name = self._render_filename(task_instance, try_number) messages, logs = self.io.read(stream_name, task_instance) - return messages, logs or [] + + messages = [ + f"Reading remote log from Cloudwatch log_group: {self.io.log_group} log_stream: {stream_name}" + ] + try: + events = [self.io.get_cloudwatch_logs(stream_name, task_instance)] + logs = ["\n".join(self._event_to_str(event) for event in events)] + except Exception as e: + logs = [] + messages.append(str(e)) + + return messages, logs + + def _event_to_str(self, event: dict) -> str: + event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc) + # Format a datetime object to a string in Zulu time without milliseconds. + formatted_event_dt = event_dt.strftime("%Y-%m-%dT%H:%M:%SZ") + message = event["message"] + return f"[{formatted_event_dt}] {message}" From 8f6faa2d00527b920e25f51f1f0f210335addf61 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sun, 3 Aug 2025 00:45:33 +0800 Subject: [PATCH 02/12] Fix datetime serialization error --- .../amazon/aws/log/cloudwatch_task_handler.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index a62795bf5891c..049ba423bbdda 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -78,6 +78,15 @@ def json_serialize(value: Any) -> str | None: return watchtower._json_serialize_default(value) +class DateTimeEncoder(json.JSONEncoder): + """Custom JSON encoder to handle datetime serialization.""" + + def default(self, obj): + if isinstance(obj, datetime): + return obj.isoformat() + return super().default(obj) + + @attrs.define(kw_only=True) class CloudWatchRemoteLogIO(LoggingMixin): # noqa: D101 base_log_folder: Path = attrs.field(converter=Path) @@ -173,7 +182,14 @@ def read(self, relative_path, ti: RuntimeTI) -> LegacyLogResponse: messages, logs = self.stream(relative_path, ti) return messages, [ - json.dumps(msg) if isinstance(msg, dict) else msg for group in logs for msg in group + json.dumps( + msg, + cls=DateTimeEncoder, + ) + if isinstance(msg, dict) + else msg + for group in logs + for msg in group ] def stream(self, relative_path, ti: RuntimeTI) -> LogResponse: From 8d68bb90179621d89d653e2411b111582b3ad842 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sun, 3 Aug 2025 20:29:20 +0800 Subject: [PATCH 03/12] Fix 'generator is not subscriptable' error --- .../airflow/providers/amazon/aws/log/cloudwatch_task_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 049ba423bbdda..d5019c9d4ad43 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -320,7 +320,7 @@ def _read_remote_logs( f"Reading remote log from Cloudwatch log_group: {self.io.log_group} log_stream: {stream_name}" ] try: - events = [self.io.get_cloudwatch_logs(stream_name, task_instance)] + events = self.io.get_cloudwatch_logs(stream_name, task_instance) logs = ["\n".join(self._event_to_str(event) for event in events)] except Exception as e: logs = [] From bf1c3ef9779cb61b235deabc0cea822937d3c966 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sun, 3 Aug 2025 20:31:35 +0800 Subject: [PATCH 04/12] Fix test_cloudwatch_task_handler --- .../aws/log/test_cloudwatch_task_handler.py | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index 01eebfa0b1f4f..72e4de814d9eb 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -159,23 +159,9 @@ def test_log_message(self): assert metadata == [ f"Reading remote log from Cloudwatch log_group: log_group_name log_stream: {stream_name}" ] - assert logs == ['[2025-03-27T21:58:01Z] {"foo": "bar", "event": "Hi", "level": "info"}'] - - def test_event_to_str(self): - handler = self.subject - current_time = int(time.time()) * 1000 - events = [ - {"timestamp": current_time - 2000, "message": "First"}, - {"timestamp": current_time - 1000, "message": "Second"}, - {"timestamp": current_time, "message": "Third"}, - ] - assert [handler._event_to_str(event) for event in events] == ( - [ - f"[{get_time_str(current_time - 2000)}] First", - f"[{get_time_str(current_time - 1000)}] Second", - f"[{get_time_str(current_time)}] Third", + assert logs == [ + '{"foo": "bar", "event": "Hi", "level": "info", "timestamp": "2025-03-27T21:58:01.002000+00:00"}' ] - ) @pytest.mark.db_test @@ -426,6 +412,22 @@ def test_filename_template_for_backward_compatibility(self): filename_template=None, ) + def test_event_to_str(self): + handler = self.cloudwatch_task_handler + current_time = int(time.time()) * 1000 + events = [ + {"timestamp": current_time - 2000, "message": "First"}, + {"timestamp": current_time - 1000, "message": "Second"}, + {"timestamp": current_time, "message": "Third"}, + ] + assert [handler._event_to_str(event) for event in events] == ( + [ + f"[{get_time_str(current_time - 2000)}] First", + f"[{get_time_str(current_time - 1000)}] Second", + f"[{get_time_str(current_time)}] Third", + ] + ) + def generate_log_events(conn, log_group_name, log_stream_name, log_events): conn.create_log_group(logGroupName=log_group_name) From c652a6e205b04010c29ff6bbc56398f449fe3025 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 21 Aug 2025 18:20:58 +0800 Subject: [PATCH 05/12] Fix nits in code review --- .../amazon/aws/log/cloudwatch_task_handler.py | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index d5019c9d4ad43..6166b31302313 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -81,7 +81,7 @@ def json_serialize(value: Any) -> str | None: class DateTimeEncoder(json.JSONEncoder): """Custom JSON encoder to handle datetime serialization.""" - def default(self, obj): + def default(self, obj: object) -> str: if isinstance(obj, datetime): return obj.isoformat() return super().default(obj) @@ -180,17 +180,15 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI): def read(self, relative_path, ti: RuntimeTI) -> LegacyLogResponse: messages, logs = self.stream(relative_path, ti) + str_logs: list[str] = [] - return messages, [ - json.dumps( - msg, - cls=DateTimeEncoder, - ) - if isinstance(msg, dict) - else msg - for group in logs - for msg in group - ] + for group in logs: + for msg in group: + if isinstance(msg, dict): + msg = json.dumps(msg, cls=DateTimeEncoder) + str_logs.append(msg) + + return messages, str_logs def stream(self, relative_path, ti: RuntimeTI) -> LogResponse: logs: list[RawLogStream] = [] From 7e76a19690a46c3b5562c835f1dd28c5a02d8b4d Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sun, 7 Sep 2025 00:50:38 +0800 Subject: [PATCH 06/12] Add CloudWatchLogEvent type --- .../src/airflow/providers/amazon/aws/hooks/logs.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/logs.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/logs.py index 884e02478ba1d..144c45c9d46f5 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/logs.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/logs.py @@ -19,7 +19,7 @@ import asyncio from collections.abc import AsyncGenerator, Generator -from typing import Any +from typing import Any, TypedDict from botocore.exceptions import ClientError @@ -35,6 +35,14 @@ NUM_CONSECUTIVE_EMPTY_RESPONSE_EXIT_THRESHOLD = 3 +class CloudWatchLogEvent(TypedDict): + """TypedDict for CloudWatch Log Event.""" + + timestamp: int + message: str + ingestionTime: int + + class AwsLogsHook(AwsBaseHook): """ Interact with Amazon CloudWatch Logs. @@ -67,7 +75,7 @@ def get_log_events( start_from_head: bool | None = None, continuation_token: ContinuationToken | None = None, end_time: int | None = None, - ) -> Generator: + ) -> Generator[CloudWatchLogEvent, None, None]: """ Return a generator for log items in a single stream; yields all items available at the current moment. From 947431cb1aded1b4f4440fa464217a9b4cee6f73 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sun, 7 Sep 2025 01:12:58 +0800 Subject: [PATCH 07/12] Correct return type of .stream method - .stream method should return gen[str] but it return in gen[dict] in previous fix - Make _parse_cloudwatch_log_event return as str intead of dict can fix the problem --- .../amazon/aws/log/cloudwatch_task_handler.py | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 6166b31302313..dd804e7966e36 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -22,6 +22,7 @@ import json import logging import os +from collections.abc import Generator from datetime import date, datetime, timedelta, timezone from functools import cached_property from pathlib import Path @@ -40,6 +41,7 @@ import structlog.typing from airflow.models.taskinstance import TaskInstance + from airflow.providers.amazon.aws.hooks.logs import CloudWatchLogEvent from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI from airflow.utils.log.file_task_handler import ( LegacyLogResponse, @@ -78,15 +80,6 @@ def json_serialize(value: Any) -> str | None: return watchtower._json_serialize_default(value) -class DateTimeEncoder(json.JSONEncoder): - """Custom JSON encoder to handle datetime serialization.""" - - def default(self, obj: object) -> str: - if isinstance(obj, datetime): - return obj.isoformat() - return super().default(obj) - - @attrs.define(kw_only=True) class CloudWatchRemoteLogIO(LoggingMixin): # noqa: D101 base_log_folder: Path = attrs.field(converter=Path) @@ -178,19 +171,17 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI): self.close() return - def read(self, relative_path, ti: RuntimeTI) -> LegacyLogResponse: + def read(self, relative_path: str, ti: RuntimeTI) -> LegacyLogResponse: messages, logs = self.stream(relative_path, ti) str_logs: list[str] = [] for group in logs: for msg in group: - if isinstance(msg, dict): - msg = json.dumps(msg, cls=DateTimeEncoder) - str_logs.append(msg) + str_logs.append(f"{msg}\n") return messages, str_logs - def stream(self, relative_path, ti: RuntimeTI) -> LogResponse: + def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: logs: list[RawLogStream] = [] messages = [ f"Reading remote log from Cloudwatch log_group: {self.log_group} log_stream: {relative_path}" @@ -206,7 +197,9 @@ def stream(self, relative_path, ti: RuntimeTI) -> LogResponse: return messages, logs - def get_cloudwatch_logs(self, stream_name: str, task_instance: RuntimeTI): + def get_cloudwatch_logs( + self, stream_name: str, task_instance: RuntimeTI + ) -> Generator[CloudWatchLogEvent, None, None]: """ Return all logs from the given log stream. @@ -228,15 +221,16 @@ def get_cloudwatch_logs(self, stream_name: str, task_instance: RuntimeTI): end_time=end_time, ) - def _parse_cloudwatch_log_event(self, event: dict) -> dict: - event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc) - message = event["message"] + def _parse_cloudwatch_log_event(self, event: CloudWatchLogEvent) -> str: + event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc).isoformat() + event_msg = event["message"] try: - message = json.loads(message) + message = json.loads(event_msg) message["timestamp"] = event_dt - return message except Exception: - return {"timestamp": event_dt, "event": message} + message = {"timestamp": event_dt, "event": event_msg} + + return json.dumps(message) class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin): @@ -326,7 +320,7 @@ def _read_remote_logs( return messages, logs - def _event_to_str(self, event: dict) -> str: + def _event_to_str(self, event: CloudWatchLogEvent) -> str: event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc) # Format a datetime object to a string in Zulu time without milliseconds. formatted_event_dt = event_dt.strftime("%Y-%m-%dT%H:%M:%SZ") From 4e1d390312c8733cb273c9d95d16daf0ae020fcb Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sun, 7 Sep 2025 01:18:07 +0800 Subject: [PATCH 08/12] Revert change in file_task_handler --- airflow-core/src/airflow/utils/log/file_task_handler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index ba9b0f0303162..d52a9c104cbc8 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -30,7 +30,7 @@ from itertools import chain, islice from pathlib import Path from types import GeneratorType -from typing import IO, TYPE_CHECKING, Any, TypedDict, cast +from typing import IO, TYPE_CHECKING, TypedDict, cast from urllib.parse import urljoin import pendulum @@ -68,10 +68,10 @@ # These types are similar, but have distinct names to make processing them less error prone LogMessages: TypeAlias = list[str] -"""The legacy format of log messages before 3.0.4""" +"""The legacy format of log messages before 3.0.2""" LogSourceInfo: TypeAlias = list[str] """Information _about_ the log fetching process for display to a user""" -RawLogStream: TypeAlias = Generator[str | dict[str, Any], None, None] +RawLogStream: TypeAlias = Generator[str, None, None] """Raw log stream, containing unparsed log lines.""" LogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None] """Legacy log response, containing source information and log messages.""" From d3d1b2b9b606b01ce5898eeda11caa95da661bc1 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 23 Oct 2025 10:29:00 +0800 Subject: [PATCH 09/12] Fix test_log_message --- .../tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index 72e4de814d9eb..e0f9e169c8a57 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -160,7 +160,7 @@ def test_log_message(self): f"Reading remote log from Cloudwatch log_group: log_group_name log_stream: {stream_name}" ] assert logs == [ - '{"foo": "bar", "event": "Hi", "level": "info", "timestamp": "2025-03-27T21:58:01.002000+00:00"}' + '{"foo": "bar", "event": "Hi", "level": "info", "timestamp": "2025-03-27T21:58:01.002000+00:00"}\n' ] From 3028a1ceee3b4a7c9a81d429e5c552b825b50e6b Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 24 Nov 2025 11:08:52 +0800 Subject: [PATCH 10/12] Revert file_task_handler change --- .../src/airflow/utils/log/file_task_handler.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index d52a9c104cbc8..44ccf2cde3803 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -68,7 +68,7 @@ # These types are similar, but have distinct names to make processing them less error prone LogMessages: TypeAlias = list[str] -"""The legacy format of log messages before 3.0.2""" +"""The legacy format of log messages before 3.0.4""" LogSourceInfo: TypeAlias = list[str] """Information _about_ the log fetching process for display to a user""" RawLogStream: TypeAlias = Generator[str, None, None] @@ -196,8 +196,7 @@ def _fetch_logs_from_service(url: str, log_relative_path: str) -> Response: if not _parse_timestamp: def _parse_timestamp(line: str): - # Make this resilient to all input types, ensure it's always a string. - timestamp_str, _ = str(line).split(" ", 1) + timestamp_str, _ = line.split(" ", 1) return pendulum.parse(timestamp_str.strip("[]")) @@ -262,10 +261,7 @@ def _log_stream_to_parsed_log_stream( for line in log_stream: if line: try: - if isinstance(line, dict): - log = StructuredLogMessage.model_validate(line) - else: - log = StructuredLogMessage.model_validate_json(line) + log = StructuredLogMessage.model_validate_json(line) except ValidationError: with suppress(Exception): # If we can't parse the timestamp, don't attach one to the row From 288a911dd9c3f76ebee4446a98d83ea778d374dd Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 24 Nov 2025 11:09:24 +0800 Subject: [PATCH 11/12] Fix type annotation for CloudWatchRemoteLogIO --- .../providers/amazon/aws/log/cloudwatch_task_handler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index dd804e7966e36..6c31cc5f03b45 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -44,11 +44,11 @@ from airflow.providers.amazon.aws.hooks.logs import CloudWatchLogEvent from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI from airflow.utils.log.file_task_handler import ( - LegacyLogResponse, LogMessages, LogResponse, LogSourceInfo, RawLogStream, + StreamingLogResponse, ) @@ -171,7 +171,7 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI): self.close() return - def read(self, relative_path: str, ti: RuntimeTI) -> LegacyLogResponse: + def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse: messages, logs = self.stream(relative_path, ti) str_logs: list[str] = [] @@ -181,7 +181,7 @@ def read(self, relative_path: str, ti: RuntimeTI) -> LegacyLogResponse: return messages, str_logs - def stream(self, relative_path: str, ti: RuntimeTI) -> LogResponse: + def stream(self, relative_path: str, ti: RuntimeTI) -> StreamingLogResponse: logs: list[RawLogStream] = [] messages = [ f"Reading remote log from Cloudwatch log_group: {self.log_group} log_stream: {relative_path}" From 0e906585d0ae58edea5f8a97ae707f339d61d5dd Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 27 Nov 2025 15:49:23 +0800 Subject: [PATCH 12/12] Fix review comments - consolidate str_logs - rename _event_to_dict --- .../amazon/aws/log/cloudwatch_task_handler.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 6c31cc5f03b45..f510daca8dab7 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -173,11 +173,7 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI): def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse: messages, logs = self.stream(relative_path, ti) - str_logs: list[str] = [] - - for group in logs: - for msg in group: - str_logs.append(f"{msg}\n") + str_logs: list[str] = [f"{msg}\n" for group in logs for msg in group] return messages, str_logs @@ -188,7 +184,7 @@ def stream(self, relative_path: str, ti: RuntimeTI) -> StreamingLogResponse: ] try: gen: RawLogStream = ( - self._parse_cloudwatch_log_event(event) + self._parse_log_event_as_dumped_json(event) for event in self.get_cloudwatch_logs(relative_path, ti) ) logs = [gen] @@ -221,7 +217,7 @@ def get_cloudwatch_logs( end_time=end_time, ) - def _parse_cloudwatch_log_event(self, event: CloudWatchLogEvent) -> str: + def _parse_log_event_as_dumped_json(self, event: CloudWatchLogEvent) -> str: event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc).isoformat() event_msg = event["message"] try: