From 16aeb66f1f0969bb093c4a4ee741ef74057c28a6 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Mon, 17 Feb 2025 14:35:04 +0000 Subject: [PATCH 01/19] Render structured logs in the new UI rather than showing raw JSON There are multiple parts to this PR; First off: the log reader interface was _a mess_ There was some odd+old code do deal with reading from multiple hosts that made the message confusing. This was added for smart sensors (which went away in v2.4 or v2.5) but this mess remained, and reading from multiple hosts is handled differently now. This PR keeps the current "parse+interleave" behaviour (though it's debatable if the interleave feature is needed specifically, or if we could get away with a simpler concat instead. Future work there if anyone wants to think about and tackle this.) but changes the JSON resposne type from a single string (the value of which was previoulsy a mess of double encoded JSON and repr of a python tuple making it impossible to do anything but display at) to a list of either strings (when it can't be parsed) or a list of dicts/StructuredLogMessage. I have also done some cursory rendering/displaying of these structured log messages in the UI, but they could be greatly improved by adding colors to various components of the log. The current rendered HTML looks like this: ```html

::group::Log message source details sources=["/root/airflow/logs/dag_id=trigger_test/run_id=manual__2025-02-16T12:23:29.118614+00:00_gOTl0Qub/task_id=waiter/attempt=1.log","/root/airflow/logs/dag_id=trigger_test/run_id=manual__2025-02-16T12:23:29.118614+00:00_gOTl0Qub/task_id=waiter/attempt=1.log.trigger.14.log"]

::endgroup::

[] DEBUG - Hook impls: [] logger="airflow.listeners.listener"

``` Although not used by the UI, the non-application/json content type is now updated to a) include the continuation token as a header, and to set the content type as application/x-ndjson --- .../api_fastapi/core_api/datamodels/log.py | 22 +- .../core_api/openapi/v1-generated.yaml | 23 +- .../api_fastapi/core_api/routes/public/log.py | 26 +- .../ui/openapi-gen/requests/schemas.gen.ts | 34 ++- airflow/ui/openapi-gen/requests/types.gen.ts | 11 +- airflow/ui/src/queries/useLogs.tsx | 41 ++- airflow/utils/log/file_task_handler.py | 170 ++++++------ airflow/utils/log/log_reader.py | 65 ++--- .../celery/log_handlers/test_log_handlers.py | 15 +- tests/utils/log/test_log_reader.py | 7 +- tests/utils/test_log_handlers.py | 255 +++++++++--------- 11 files changed, 403 insertions(+), 266 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/log.py b/airflow/api_fastapi/core_api/datamodels/log.py index 79b341d6e3808..e67264ae3c315 100644 --- a/airflow/api_fastapi/core_api/datamodels/log.py +++ b/airflow/api_fastapi/core_api/datamodels/log.py @@ -16,11 +16,29 @@ # under the License. from __future__ import annotations -from pydantic import BaseModel +from datetime import datetime +from typing import Annotated + +from pydantic import BaseModel, ConfigDict, WithJsonSchema + + +class StructuredLogMessage(BaseModel): + """An individual log message.""" + + # Not every message has a timestamp. + timestamp: Annotated[ + datetime | None, + # Schema level, say this is always a datetime if it exists + WithJsonSchema({"type": "string", "format": "date-time"}), + ] = None + event: str + + model_config = ConfigDict(extra="allow") class TaskInstancesLogResponse(BaseModel): """Log serializer for responses.""" - content: str + content: list[StructuredLogMessage] | list[str] + """Either a list of parsed events, or a list of lines on parse error""" continuation_token: str | None diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index a71113cb13bd4..d9d38cd39d2c2 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -10106,6 +10106,21 @@ components: - arrange title: StructureDataResponse description: Structure Data serializer for responses. + StructuredLogMessage: + properties: + timestamp: + type: string + format: date-time + title: Timestamp + event: + type: string + title: Event + additionalProperties: true + type: object + required: + - event + title: StructuredLogMessage + description: An individual log message. TaskCollectionResponse: properties: tasks: @@ -10697,7 +10712,13 @@ components: TaskInstancesLogResponse: properties: content: - type: string + anyOf: + - items: + $ref: '#/components/schemas/StructuredLogMessage' + type: array + - items: + type: string + type: array title: Content continuation_token: anyOf: diff --git a/airflow/api_fastapi/core_api/routes/public/log.py b/airflow/api_fastapi/core_api/routes/public/log.py index a6bc24f638067..09ec4dc672961 100644 --- a/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow/api_fastapi/core_api/routes/public/log.py @@ -18,7 +18,6 @@ from __future__ import annotations import textwrap -from typing import Any from fastapi import HTTPException, Request, Response, status from itsdangerous import BadSignature, URLSafeSerializer @@ -65,6 +64,7 @@ }, }, response_model=TaskInstancesLogResponse, + response_model_exclude_unset=True, ) def get_log( dag_id: str, @@ -135,12 +135,22 @@ def get_log( except TaskNotFound: pass - logs: Any if accept == Mimetype.JSON or accept == Mimetype.ANY: # default logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata) - # we must have token here, so we can safely ignore it - token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment] - return TaskInstancesLogResponse(continuation_token=token, content=str(logs[0])).model_dump() - # text/plain. Stream - logs = task_log_reader.read_log_stream(ti, try_number, metadata) - return Response(media_type=accept, content="".join(list(logs))) + encoded_token = None + if not metadata.get("end_of_log", False): + encoded_token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) + return TaskInstancesLogResponse.model_construct(continuation_token=encoded_token, content=logs) + else: + # text/plain, or something else we don't understand. Return raw log content + + # We need to exhaust the iterator before we can generate the continuation token. + # We could improve this by making it a streaming/async response, and by then setting the header using + # HTTP Trailers + logs = "".join(task_log_reader.read_log_stream(ti, try_number, metadata)) + headers = None + if not metadata.get("end_of_log", False): + headers = { + "Airflow-Continuation-Token": URLSafeSerializer(request.app.state.secret_key).dumps(metadata) + } + return Response(media_type="application/x-ndjson", content=logs, headers=headers) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 1c0a537d38120..89bd00998f730 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4645,6 +4645,25 @@ export const $StructureDataResponse = { description: "Structure Data serializer for responses.", } as const; +export const $StructuredLogMessage = { + properties: { + timestamp: { + type: "string", + format: "date-time", + title: "Timestamp", + }, + event: { + type: "string", + title: "Event", + }, + }, + additionalProperties: true, + type: "object", + required: ["event"], + title: "StructuredLogMessage", + description: "An individual log message.", +} as const; + export const $TaskCollectionResponse = { properties: { tasks: { @@ -5628,7 +5647,20 @@ export const $TaskInstancesBatchBody = { export const $TaskInstancesLogResponse = { properties: { content: { - type: "string", + anyOf: [ + { + items: { + $ref: "#/components/schemas/StructuredLogMessage", + }, + type: "array", + }, + { + items: { + type: "string", + }, + type: "array", + }, + ], title: "Content", }, continuation_token: { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 32982e08b0195..2ca718644c5fd 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1216,6 +1216,15 @@ export type StructureDataResponse = { export type arrange = "BT" | "LR" | "RL" | "TB"; +/** + * An individual log message. + */ +export type StructuredLogMessage = { + timestamp?: string; + event: string; + [key: string]: unknown | string; +}; + /** * Task collection serializer for responses. */ @@ -1393,7 +1402,7 @@ export type TaskInstancesBatchBody = { * Log serializer for responses. */ export type TaskInstancesLogResponse = { - content: string; + content: Array | Array; continuation_token: string | null; }; diff --git a/airflow/ui/src/queries/useLogs.tsx b/airflow/ui/src/queries/useLogs.tsx index c165e13c0b893..c667d3c883688 100644 --- a/airflow/ui/src/queries/useLogs.tsx +++ b/airflow/ui/src/queries/useLogs.tsx @@ -29,7 +29,35 @@ type Props = { }; type ParseLogsProps = { - data: string | undefined; + data: TaskInstanceResponse["content"]; +}; + +const renderStructuredLog = ({ event, level = undefined, timestamp = undefined, ...structured }, index) => { + const elements = []; + + if (Boolean(timestamp)) { + elements.push("[", , "] "); + } + + if (Boolean(level)) { + elements.push({level.toUpperCase()}, " - "); + } + + elements.push({event}); + + for (const key in structured) { + if (!Object.hasOwn(structured, key)) { + continue; + } + elements.push( + " ", + + {key}={JSON.stringify(structured[key])} + , + ); + } + + return

{elements}

; }; // TODO: add support for log groups, colors, formats, filters @@ -41,17 +69,18 @@ const parseLogs = ({ data }: ParseLogsProps) => { let warning; + let parsedLines; + try { - lines = data.split("\\n"); - } catch { + parsedLines = data.map(renderStructuredLog); + } catch (error) { + // eslint-disable-next-line no-console + console.warn(`Error parsing logs: ${error}`); warning = "Unable to show logs. There was an error parsing logs."; return { data, warning }; } - // eslint-disable-next-line react/no-array-index-key - const parsedLines = lines.map((line, index) =>

{line}

); - return { fileSources: [], parsedLogs: parsedLines, diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 6f4086843db8f..9ba77278a447a 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -19,16 +19,19 @@ from __future__ import annotations +import itertools import logging import os from collections.abc import Iterable from contextlib import suppress +from datetime import datetime from enum import Enum from pathlib import Path from typing import TYPE_CHECKING, Any, Callable from urllib.parse import urljoin import pendulum +from pydantic import BaseModel, ConfigDict, ValidationError from airflow.configuration import conf from airflow.exceptions import AirflowException @@ -47,6 +50,18 @@ logger = logging.getLogger(__name__) +class StructuredLogMessage(BaseModel): + """An individual log message.""" + + timestamp: datetime | None + event: str + + # We don't need to cache string when parsing in to this, as almost every line will have a different + # values; `extra=allow` means we'll create extra properties as needed. Only timestamp and event are + # required, everything else is up to what ever is producing the logs + model_config = ConfigDict(cache_strings=False, extra="allow") + + class LogType(str, Enum): """ Type of service from which we retrieve logs. @@ -107,30 +122,36 @@ def _parse_timestamp(line: str): return pendulum.parse(timestamp_str.strip("[]")) -def _parse_timestamps_in_log_file(lines: Iterable[str]): +def _parse_log_lines(lines: Iterable[str]) -> Iterable[tuple[datetime | None, int, StructuredLogMessage]]: + from airflow.utils.timezone import coerce_datetime + timestamp = None next_timestamp = None for idx, line in enumerate(lines): if line: - with suppress(Exception): - # next_timestamp unchanged if line can't be parsed - next_timestamp = _parse_timestamp(line) - if next_timestamp: - timestamp = next_timestamp - yield timestamp, idx, line - - -def _interleave_logs(*logs): - records = [] - for log in logs: - records.extend(_parse_timestamps_in_log_file(log.splitlines())) + try: + # Try to parse it as json first + log = StructuredLogMessage.model_validate_json(line) + except ValidationError: + with suppress(Exception): + # If we can't parse the timestamp, don't attach one to the row + next_timestamp = _parse_timestamp(line) + log = StructuredLogMessage.model_construct(event=line, timestamp=next_timestamp) + if log.timestamp: + log.timestamp = coerce_datetime(log.timestamp) + timestamp = log.timestamp + yield timestamp, idx, log + + +def _interleave_logs(*logs) -> Iterable[StructuredLogMessage]: + min_date = pendulum.datetime(2000, 1, 1) + + records = itertools.chain.from_iterable(_parse_log_lines(log.splitlines()) for log in logs) last = None - for timestamp, _, line in sorted( - records, key=lambda x: (x[0], x[1]) if x[0] else (pendulum.datetime(2000, 1, 1), x[1]) - ): - if line != last or not timestamp: # dedupe - yield line - last = line + for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date, x[1])): + if msg != last or not timestamp: # dedupe + yield msg + last = msg def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance: @@ -297,9 +318,6 @@ def _render_filename(self, ti: TaskInstance, try_number: int, session=NEW_SESSIO else: raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen") - def _read_grouped_logs(self): - return False - def _get_executor_get_task_log( self, ti: TaskInstance ) -> Callable[[TaskInstance, int], tuple[list[str], list[str]]]: @@ -351,40 +369,40 @@ def _read( # initializing the handler. Thus explicitly getting log location # is needed to get correct log path. worker_log_rel_path = self._render_filename(ti, try_number) - messages_list: list[str] = [] + source_list: list[str] = [] remote_logs: list[str] = [] local_logs: list[str] = [] - executor_messages: list[str] = [] + sources: list[str] = [] executor_logs: list[str] = [] served_logs: list[str] = [] with suppress(NotImplementedError): - remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata) - messages_list.extend(remote_messages) + sources, remote_logs = self._read_remote_logs(ti, try_number, metadata) + source_list.extend(sources) has_k8s_exec_pod = False if ti.state == TaskInstanceState.RUNNING: executor_get_task_log = self._get_executor_get_task_log(ti) response = executor_get_task_log(ti, try_number) if response: - executor_messages, executor_logs = response - if executor_messages: - messages_list.extend(executor_messages) + sources, executor_logs = response + if sources: + source_list.extend(sources) has_k8s_exec_pod = True if not (remote_logs and ti.state not in State.unfinished): # when finished, if we have remote logs, no need to check local worker_log_full_path = Path(self.local_base, worker_log_rel_path) - local_messages, local_logs = self._read_from_local(worker_log_full_path) - messages_list.extend(local_messages) + sources, local_logs = self._read_from_local(worker_log_full_path) + source_list.extend(sources) if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not has_k8s_exec_pod: - served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) - messages_list.extend(served_messages) + sources, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) + source_list.extend(sources) elif ti.state not in State.unfinished and not (local_logs or remote_logs): # ordinarily we don't check served logs, with the assumption that users set up # remote logging or shared drive for logs for persistence, but that's not always true # so even if task is done, if no local logs or remote logs are found, we'll check the worker - served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) - messages_list.extend(served_messages) + sources, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) + source_list.extend(sources) - logs = "\n".join( + logs = list( _interleave_logs( *local_logs, *remote_logs, @@ -395,18 +413,22 @@ def _read( log_pos = len(logs) # Log message source details are grouped: they are not relevant for most users and can # distract them from finding the root cause of their errors - messages = " INFO - ::group::Log message source details\n" - messages += "".join([f"*** {x}\n" for x in messages_list]) - messages += " INFO - ::endgroup::\n" + header = [ + StructuredLogMessage.model_construct( + event="::group::Log message source details", sources=source_list + ), + StructuredLogMessage.model_construct(event="::endgroup::"), + ] end_of_log = ti.try_number != try_number or ti.state not in ( TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED, ) if metadata and "log_pos" in metadata: - previous_chars = metadata["log_pos"] - logs = logs[previous_chars:] # Cut off previously passed log test as new tail - out_message = logs if "log_pos" in (metadata or {}) else messages + logs - return out_message, {"end_of_log": end_of_log, "log_pos": log_pos} + previous_line = metadata["log_pos"] + logs = logs[previous_line:] # Cut off previously passed log test as new tail + else: + logs = header + logs + return logs, {"end_of_log": end_of_log, "log_pos": log_pos} @staticmethod def _get_pod_namespace(ti: TaskInstance): @@ -439,7 +461,9 @@ def _get_log_retrieval_url( log_relative_path, ) - def read(self, task_instance, try_number=None, metadata=None): + def read( + self, task_instance, try_number: int | None = None, metadata=None + ) -> tuple[list[StructuredLogMessage] | str, dict[str, Any]]: """ Read logs of given task instance from local machine. @@ -449,33 +473,17 @@ def read(self, task_instance, try_number=None, metadata=None): :param metadata: log metadata, can be used for steaming log reading and auto-tailing. :return: a list of listed tuples which order log string by host """ - # Task instance increments its try number when it starts to run. - # So the log for a particular task try will only show up when - # try number gets incremented in DB, i.e logs produced the time - # after cli run and before try_number + 1 in DB will not be displayed. if try_number is None: - next_try = task_instance.try_number + 1 - try_numbers = list(range(1, next_try)) - elif try_number < 1: + try_number = task_instance.try_number + if try_number is None or try_number < 1: logs = [ - [("default_host", f"Error fetching the logs. Try number {try_number} is invalid.")], + StructuredLogMessage.model_construct( + level="error", event=f"Error fetching the logs. Try number {try_number} is invalid." + ) ] - return logs, [{"end_of_log": True}] - else: - try_numbers = [try_number] + return logs, {"end_of_log": True} - logs = [""] * len(try_numbers) - metadata_array = [{}] * len(try_numbers) - - # subclasses implement _read and may not have log_type, which was added recently - for i, try_number_element in enumerate(try_numbers): - log, out_metadata = self._read(task_instance, try_number_element, metadata) - # es_task_handler return logs grouped by host. wrap other handler returning log string - # with default/ empty host so that UI can render the response in the same way - logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)] - metadata_array[i] = out_metadata - - return logs, metadata_array + return self._read(task_instance, try_number, metadata) @staticmethod def _prepare_log_folder(directory: Path, new_folder_permissions: int): @@ -542,23 +550,20 @@ def _init_file(self, ti, *, identifier: str | None = None): @staticmethod def _read_from_local(worker_log_path: Path) -> tuple[list[str], list[str]]: - messages = [] paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*")) - if paths: - messages.append("Found local files:") - messages.extend(f" * {x}" for x in paths) + sources = [os.fspath(x) for x in paths] logs = [file.read_text() for file in paths] - return messages, logs + return sources, logs def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], list[str]]: - messages = [] + sources = [] logs = [] try: log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type) response = _fetch_logs_from_service(url, rel_path) if response.status_code == 403: - messages.append( + sources.append( "!!!! Please make sure that all your Airflow components (e.g. " "schedulers, webservers, workers and triggerer) have " "the same 'secret_key' configured in 'webserver' section and " @@ -566,20 +571,21 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], li "See more at https://airflow.apache.org/docs/apache-airflow/" "stable/configurations-ref.html#secret-key" ) - # Check if the resource was properly fetched - response.raise_for_status() - if response.text: - messages.append(f"Found logs served from host {url}") - logs.append(response.text) + else: + # Check if the resource was properly fetched + response.raise_for_status() + if response.text: + sources.append(url) + logs.append(response.text) except Exception as e: from requests.exceptions import InvalidSchema if isinstance(e, InvalidSchema) and ti.task.inherits_from_empty_operator is True: - messages.append(self.inherits_from_empty_operator_log_message) + sources.append(self.inherits_from_empty_operator_log_message) else: - messages.append(f"Could not read served logs: {e}") + sources.append(f"Could not read served logs: {e}") logger.exception("Could not read served logs") - return messages, logs + return sources, logs def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], list[str]]: """ diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index cc60500532fb1..d82edf7756e95 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -20,10 +20,11 @@ import time from collections.abc import Iterator from functools import cached_property -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Union from airflow.configuration import conf from airflow.utils.helpers import render_log_filename +from airflow.utils.log.file_task_handler import StructuredLogMessage from airflow.utils.log.logging_mixin import ExternalLoggingMixin from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import TaskInstanceState @@ -32,6 +33,10 @@ from sqlalchemy.orm.session import Session from airflow.models.taskinstance import TaskInstance + from airflow.typing_compat import TypeAlias + +LogMessages: TypeAlias = Union[list[StructuredLogMessage], str] +LogMetadata: TypeAlias = dict[str, Any] class TaskLogReader: @@ -42,13 +47,12 @@ class TaskLogReader: def read_log_chunks( self, ti: TaskInstance, try_number: int | None, metadata - ) -> tuple[list[tuple[tuple[str, str]]], dict[str, str]]: + ) -> tuple[LogMessages, LogMetadata]: """ Read chunks of Task Instance logs. :param ti: The taskInstance - :param try_number: If provided, logs for the given try will be returned. - Otherwise, logs from all attempts are returned. + :param try_number: :param metadata: A dictionary containing information about how to read the task log The following is an example of how to use this method to read log: @@ -62,9 +66,7 @@ def read_log_chunks( contain information about the task log which can enable you read logs to the end. """ - logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata) - metadata = metadatas[0] - return logs, metadata + return self.log_handler.read(ti, try_number, metadata=metadata) def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: dict) -> Iterator[str]: """ @@ -75,29 +77,32 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di :param metadata: A dictionary containing information about how to read the task log """ if try_number is None: - next_try = ti.try_number + 1 - try_numbers = list(range(1, next_try)) - else: - try_numbers = [try_number] - for current_try_number in try_numbers: - metadata.pop("end_of_log", None) - metadata.pop("max_offset", None) - metadata.pop("offset", None) - metadata.pop("log_pos", None) - while True: - logs, metadata = self.read_log_chunks(ti, current_try_number, metadata) - for host, log in logs[0]: - yield "\n".join([host or "", log]) + "\n" - if "end_of_log" not in metadata or ( - not metadata["end_of_log"] - and ti.state not in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) - ): - if not logs[0]: - # we did not receive any logs in this loop - # sleeping to conserve resources / limit requests on external services - time.sleep(self.STREAM_LOOP_SLEEP_SECONDS) - else: - break + try_number = ti.try_number + metadata.pop("end_of_log", None) + metadata.pop("max_offset", None) + metadata.pop("offset", None) + metadata.pop("log_pos", None) + while True: + logs, out_metadata = self.read_log_chunks(ti, try_number, metadata) + # Update the metadata dict in place so caller can get new values/end-of-log etc. + + for log in logs: + # It's a bit wasteful here to parse the JSON then dump it back again. + # Optimize this so in stream mode we can just pass logs right through, or even better add + # support to 307 redirect to a signed URL etc. + yield (log if isinstance(log, str) else log.model_dump_json()) + "\n" + if not out_metadata.get("end_of_log", False) and ti.state not in ( + TaskInstanceState.RUNNING, + TaskInstanceState.DEFERRED, + ): + if not logs[0]: + # we did not receive any logs in this loop + # sleeping to conserve resources / limit requests on external services + time.sleep(self.STREAM_LOOP_SLEEP_SECONDS) + else: + metadata.clear() + metadata.update(out_metadata) + break @cached_property def log_handler(self): diff --git a/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py b/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py index be37e13b14bd9..456f680de41fa 100644 --- a/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py +++ b/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py @@ -37,6 +37,7 @@ from airflow.utils.types import DagRunType from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test @@ -78,8 +79,14 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc fth._read_from_logs_server = mock.Mock() fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"] - actual = fth._read(ti=ti, try_number=1) + logs, metadata = fth._read(ti=ti, try_number=1) fth._read_from_logs_server.assert_called_once() - assert "*** this message\n" in actual[0] - assert actual[0].endswith("this\nlog\ncontent") - assert actual[1] == {"end_of_log": False, "log_pos": 16} + + if AIRFLOW_V_3_0_PLUS: + assert metadata == {"end_of_log": False, "log_pos": 3} + assert logs[0].sources == ["this message"] + assert [x.event for x in logs[-3:]] == ["this", "log", "content"] + else: + assert "*** this message\n" in logs + assert logs.endswith("this\nlog\ncontent") + assert metadata == {"end_of_log": False, "log_pos": 16} diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index fe0f13297c86f..7d953cb73268d 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -125,10 +125,9 @@ def test_test_read_log_chunks_should_read_one_try(self): task_log_reader = TaskLogReader() ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS - logs, metadatas = task_log_reader.read_log_chunks(ti=ti, try_number=1, metadata={}) - assert logs[0] == [ + logs, metadata = task_log_reader.read_log_chunks(ti=ti, try_number=1, metadata={}) + assert logs == [ ( - "localhost", " INFO - ::group::Log message source details\n" "*** Found local files:\n" f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n" @@ -136,7 +135,7 @@ def test_test_read_log_chunks_should_read_one_try(self): "try_number=1.", ) ] - assert metadatas == {"end_of_log": True, "log_pos": 13} + assert metadata == {"end_of_log": True, "log_pos": 13} def test_test_read_log_chunks_should_read_all_files(self): task_log_reader = TaskLogReader() diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index f7d1b092f13c4..d760f18a63ebe 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -17,10 +17,12 @@ # under the License. from __future__ import annotations +import itertools import logging import logging.config import os import re +from collections.abc import Iterable from http import HTTPStatus from importlib import reload from pathlib import Path @@ -28,7 +30,9 @@ from unittest.mock import patch import pendulum +import pendulum.tz import pytest +from pydantic import TypeAdapter from pydantic.v1.utils import deep_update from requests.adapters import Response @@ -43,9 +47,10 @@ from airflow.utils.log.file_task_handler import ( FileTaskHandler, LogType, + StructuredLogMessage, _fetch_logs_from_service, _interleave_logs, - _parse_timestamps_in_log_file, + _parse_log_lines, ) from airflow.utils.log.logging_mixin import set_context from airflow.utils.net import get_hostname @@ -63,6 +68,19 @@ FILE_TASK_HANDLER = "task" +def events(logs: Iterable[StructuredLogMessage], skip_source_info=True) -> list[str]: + """Helper function to return just the event (a.k.a message) from a list of StructuredLogMessage""" + logs = iter(logs) + if skip_source_info: + + def is_source_group(log: StructuredLogMessage): + return not hasattr(log, "timestamp") or log.event == "::endgroup" + + logs = itertools.dropwhile(is_source_group, logs) + + return [s.event for s in logs] + + class TestFileTaskLogHandler: def clean_up(self): with create_session() as session: @@ -111,6 +129,7 @@ def task_callable(ti): assert file_handler.handler is not None # We expect set_context generates a file locally. log_filename = file_handler.handler.baseFilename + assert os.path.isfile(log_filename) assert log_filename.endswith("0.log"), log_filename @@ -122,14 +141,9 @@ def task_callable(ti): assert hasattr(file_handler, "read") # Return value of read must be a tuple of list and list. # passing invalid `try_number` to read function - logs, metadatas = file_handler.read(ti, 0) - assert isinstance(logs, list) - assert isinstance(metadatas, list) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert isinstance(metadatas[0], dict) - assert logs[0][0][0] == "default_host" - assert logs[0][0][1] == "Error fetching the logs. Try number 0 is invalid." + log, metadata = file_handler.read(ti, 0) + assert isinstance(metadata, dict) + assert log[0].event == "Error fetching the logs. Try number 0 is invalid." # Remove the generated tmp log file. os.remove(log_filename) @@ -146,9 +160,8 @@ def task_callable(ti): dagrun = dag_maker.create_dagrun() - (ti,) = dagrun.get_task_instances() + (ti,) = dagrun.get_task_instances(session=session) ti.try_number += 1 - session.merge(ti) session.flush() logger = ti.log ti.log.disabled = False @@ -171,18 +184,14 @@ def task_callable(ti): file_handler.close() assert hasattr(file_handler, "read") - # Return value of read must be a tuple of list and list. - logs, metadatas = file_handler.read(ti) - assert isinstance(logs, list) - assert isinstance(metadatas, list) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert isinstance(metadatas[0], dict) - target_re = r"\n\[[^\]]+\] {test_log_handlers.py:\d+} INFO - test\n" + log, metadata = file_handler.read(ti, 1) + assert isinstance(metadata, dict) + target_re = re.compile(r"\A\[[^\]]+\] {test_log_handlers.py:\d+} INFO - test\Z") # We should expect our log line from the callable above to appear in # the logs we read back - assert re.search(target_re, logs[0][0][-1]), "Logs were " + str(logs) + + assert any(re.search(target_re, e) for e in events(log)), "Logs were " + str(log) # Remove the generated tmp log file. os.remove(log_filename) @@ -309,14 +318,10 @@ def task_callable(ti): logger.info("Test") # Return value of read must be a tuple of list and list. - logs, metadatas = file_handler.read(ti) + logs, metadata = file_handler.read(ti) assert isinstance(logs, list) # Logs for running tasks should show up too. - assert isinstance(logs, list) - assert isinstance(metadatas, list) - assert len(logs) == 2 - assert len(logs) == len(metadatas) - assert isinstance(metadatas[0], dict) + assert isinstance(metadata, dict) # Remove the generated tmp log file. os.remove(log_filename) @@ -377,7 +382,7 @@ def task_callable(ti): assert current_file_size < max_bytes_size # Return value of read must be a tuple of list and list. - logs, metadatas = file_handler.read(ti) + logs, metadata = file_handler.read(ti) # the log content should have the filename of both current log file and rotate log file. find_current_log = False @@ -390,12 +395,8 @@ def task_callable(ti): assert find_current_log is True assert find_rotate_log_1 is True - assert isinstance(logs, list) # Logs for running tasks should show up too. assert isinstance(logs, list) - assert isinstance(metadatas, list) - assert len(logs) == len(metadatas) - assert isinstance(metadatas[0], dict) # Remove the two generated tmp log files. os.remove(log_filename) @@ -418,11 +419,12 @@ def test__read_when_local(self, mock_read_local, create_task_instance): logical_date=DEFAULT_DATE, ) fth = FileTaskHandler("") - actual = fth._read(ti=local_log_file_read, try_number=1) + logs, metadata = fth._read(ti=local_log_file_read, try_number=1) mock_read_local.assert_called_with(path) - assert "*** the messages\n" in actual[0] - assert actual[0].endswith("the log") - assert actual[1] == {"end_of_log": True, "log_pos": 7} + as_text = events(logs) + assert logs[0].sources == ["the messages"] + assert as_text[-1] == "the log" + assert metadata == {"end_of_log": True, "log_pos": 1} def test__read_from_local(self, tmp_path): """Tests the behavior of method _read_from_local""" @@ -433,11 +435,7 @@ def test__read_from_local(self, tmp_path): path2.write_text("file2 content") fth = FileTaskHandler("") assert fth._read_from_local(path1) == ( - [ - "Found local files:", - f" * {path1}", - f" * {path2}", - ], + [str(path1), str(path2)], ["file1 content", "file2 content"], ) @@ -480,16 +478,15 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs( fth._read_from_local.return_value = ["found local logs"], ["local\nlog\ncontent"] fth._read_from_logs_server = mock.Mock() fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"] - actual = fth._read(ti=ti, try_number=1) + logs, metadata = fth._read(ti=ti, try_number=1) if served_logs_checked: fth._read_from_logs_server.assert_called_once() - assert "*** this message\n" in actual[0] - assert actual[0].endswith("this\nlog\ncontent") - assert actual[1] == {"end_of_log": True, "log_pos": 16} + assert events(logs) == ["this", "log", "content"] + assert metadata == {"end_of_log": True, "log_pos": 3} else: fth._read_from_logs_server.assert_not_called() - assert actual[0] - assert actual[1] + assert logs + assert metadata def test_add_triggerer_suffix(self): sample = "any/path/to/thing.txt" @@ -617,7 +614,7 @@ def test_log_retrieval_valid_trigger(self, create_task_instance): def test_parse_timestamps(): actual = [] - for timestamp, _, _ in _parse_timestamps_in_log_file(log_sample.splitlines()): + for timestamp, _, _ in _parse_log_lines(log_sample.splitlines()): actual.append(timestamp) assert actual == [ pendulum.parse("2022-11-16T00:05:54.278000-08:00"), @@ -671,85 +668,87 @@ def test_interleave_interleaves(): "[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554", ] ) - expected = "\n".join( - [ - "[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1", - "[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing on 2022-11-16 08:05:52.324532+00:00", - "[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task", - "[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', 'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', '33648', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']", - "[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait", - "[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running on host daniels-mbp-2.lan", - "[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER=airflow", - "AIRFLOW_CTX_DAG_ID=simple_async_timedelta", - "AIRFLOW_CTX_TASK_ID=wait", - "AIRFLOW_CTX_LOGICAL_DATE=2022-11-16T08:05:52.324532+00:00", - "AIRFLOW_CTX_TRY_NUMBER=1", - "AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00", - "[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554", - ] + + tz = pendulum.tz.FixedTimezone(-28800, name="-08:00") + DateTime = pendulum.DateTime + expected = [ + { + "event": "[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 278000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - " + "Executing on 2022-11-16 " + "08:05:52.324532+00:00", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 295000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - " + "Started process 52536 to run task", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 300000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - " + "Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', " + "'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', " + "'33648', '--raw', '--subdir', " + "'/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', " + "'--cfg-path', " + "'/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 306000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - " + "Job 33648: Subtask wait", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 309000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - " + "Running on host " + "daniels-mbp-2.lan", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 457000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - " + "Exporting env vars: AIRFLOW_CTX_DAG_OWNER=airflow", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "AIRFLOW_CTX_DAG_ID=simple_async_timedelta", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "AIRFLOW_CTX_TASK_ID=wait", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "AIRFLOW_CTX_LOGICAL_DATE=2022-11-16T08:05:52.324532+00:00", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "AIRFLOW_CTX_TRY_NUMBER=1", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - " + "Pausing task as DEFERRED. dag_id=simple_async_timedelta, " + "task_id=wait, execution_date=20221116T080552, " + "start_date=20221116T080554", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 604000, tzinfo=tz), + }, + ] + # Use a type adapter to durn it in to dicts -- makes it easier to compare/test than a bunch of objects + results = TypeAdapter(list[StructuredLogMessage]).dump_python( + _interleave_logs(log_sample2, log_sample1, log_sample3) ) - assert "\n".join(_interleave_logs(log_sample2, log_sample1, log_sample3)) == expected - - -long_sample = """ -*** yoyoyoyo -[2023-01-15T22:36:46.474-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti= -[2023-01-15T22:36:46.482-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti= -[2023-01-15T22:36:46.483-0800] {taskinstance.py:1332} INFO - Starting attempt 1 of 1 -[2023-01-15T22:36:46.516-0800] {taskinstance.py:1351} INFO - Executing on 2023-01-16 06:36:43.044492+00:00 -[2023-01-15T22:36:46.522-0800] {standard_task_runner.py:56} INFO - Started process 38807 to run task -[2023-01-15T22:36:46.530-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '487', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmpiwyl54bn', '--no-shut-down-logging'] -[2023-01-15T22:36:46.536-0800] {standard_task_runner.py:84} INFO - Job 487: Subtask wait -[2023-01-15T22:36:46.624-0800] {task_command.py:417} INFO - Running on host daniels-mbp-2.lan -[2023-01-15T22:36:46.918-0800] {taskinstance.py:1558} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example_time_delta_sensor_async' AIRFLOW_CTX_TASK_ID='wait' AIRFLOW_CTX_LOGICAL_DATE='2023-01-16T06:36:43.044492+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-01-16T06:36:43.044492+00:00' -[2023-01-15T22:36:46.929-0800] {taskinstance.py:1433} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646 -[2023-01-15T22:36:46.981-0800] {local_task_job.py:218} INFO - Task exited with return code 100 (task deferral) - -[2023-01-15T22:36:46.474-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti= -[2023-01-15T22:36:46.482-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti= -[2023-01-15T22:36:46.483-0800] {taskinstance.py:1332} INFO - Starting attempt 1 of 1 -[2023-01-15T22:36:46.516-0800] {taskinstance.py:1351} INFO - Executing on 2023-01-16 06:36:43.044492+00:00 -[2023-01-15T22:36:46.522-0800] {standard_task_runner.py:56} INFO - Started process 38807 to run task -[2023-01-15T22:36:46.530-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '487', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmpiwyl54bn', '--no-shut-down-logging'] -[2023-01-15T22:36:46.536-0800] {standard_task_runner.py:84} INFO - Job 487: Subtask wait -[2023-01-15T22:36:46.624-0800] {task_command.py:417} INFO - Running on host daniels-mbp-2.lan -[2023-01-15T22:36:46.918-0800] {taskinstance.py:1558} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example_time_delta_sensor_async' AIRFLOW_CTX_TASK_ID='wait' AIRFLOW_CTX_LOGICAL_DATE='2023-01-16T06:36:43.044492+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-01-16T06:36:43.044492+00:00' -[2023-01-15T22:36:46.929-0800] {taskinstance.py:1433} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646 -[2023-01-15T22:36:46.981-0800] {local_task_job.py:218} INFO - Task exited with return code 100 (task deferral) -[2023-01-15T22:37:17.673-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti= -[2023-01-15T22:37:17.681-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti= -[2023-01-15T22:37:17.682-0800] {taskinstance.py:1330} INFO - resuming after deferral -[2023-01-15T22:37:17.693-0800] {taskinstance.py:1351} INFO - Executing on 2023-01-16 06:36:43.044492+00:00 -[2023-01-15T22:37:17.697-0800] {standard_task_runner.py:56} INFO - Started process 39090 to run task -[2023-01-15T22:37:17.703-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '488', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp_sa9sau4', '--no-shut-down-logging'] -[2023-01-15T22:37:17.707-0800] {standard_task_runner.py:84} INFO - Job 488: Subtask wait -[2023-01-15T22:37:17.771-0800] {task_command.py:417} INFO - Running on host daniels-mbp-2.lan -[2023-01-15T22:37:18.043-0800] {taskinstance.py:1369} INFO - Marking task as SUCCESS. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646, end_date=20230116T063718 -[2023-01-15T22:37:18.117-0800] {local_task_job.py:220} INFO - Task exited with return code 0 -[2023-01-15T22:37:18.147-0800] {taskinstance.py:2648} INFO - 0 downstream tasks scheduled from follow-on schedule check -[2023-01-15T22:37:18.173-0800] {:0} Level None - end_of_log - -*** hihihi! -[2023-01-15T22:36:48.348-0800] {temporal.py:62} INFO - trigger starting -[2023-01-15T22:36:48.348-0800] {temporal.py:66} INFO - 24 seconds remaining; sleeping 10 seconds -[2023-01-15T22:36:58.349-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:36:59.349-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:00.349-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:01.350-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:02.350-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:03.351-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:04.351-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:05.353-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:06.354-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:07.355-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:08.356-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:09.357-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:10.358-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:11.359-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:12.359-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:13.360-0800] {temporal.py:74} INFO - yielding event with payload DateTime(2023, 1, 16, 6, 37, 13, 44492, tzinfo=Timezone('UTC')) -[2023-01-15T22:37:13.361-0800] {triggerer_job.py:540} INFO - Trigger (ID 106) fired: TriggerEvent -""" + # TypeAdapter gives us a generator out when it's generator is an input. Nice, but not useful for testing + results = list(results) + assert results == expected def test_interleave_logs_correct_ordering(): @@ -765,7 +764,8 @@ def test_interleave_logs_correct_ordering(): [2023-01-17T12:47:11.883-0800] {triggerer_job.py:540} INFO - Trigger (ID 1) fired: TriggerEvent """ - assert sample_with_dupe == "\n".join(_interleave_logs(sample_with_dupe, "", sample_with_dupe)) + logs = events(_interleave_logs(sample_with_dupe, "", sample_with_dupe)) + assert sample_with_dupe == "\n".join(logs) def test_interleave_logs_correct_dedupe(): @@ -780,7 +780,8 @@ def test_interleave_logs_correct_dedupe(): test, test""" - assert sample_without_dupe == "\n".join(_interleave_logs(",\n ".join(["test"] * 10))) + logs = events(_interleave_logs(",\n ".join(["test"] * 10))) + assert sample_without_dupe == "\n".join(logs) def test_permissions_for_new_directories(tmp_path): From 9da1231d8cadd13cfa307b0eaedd28175a7c87d5 Mon Sep 17 00:00:00 2001 From: Brent Bovenzi Date: Mon, 17 Feb 2025 17:49:53 -0500 Subject: [PATCH 02/19] Fix typescript useLogs --- airflow/ui/src/queries/useLogs.tsx | 48 ++++++++++++++++-------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/airflow/ui/src/queries/useLogs.tsx b/airflow/ui/src/queries/useLogs.tsx index c667d3c883688..799a01fe4a7e7 100644 --- a/airflow/ui/src/queries/useLogs.tsx +++ b/airflow/ui/src/queries/useLogs.tsx @@ -19,7 +19,11 @@ import dayjs from "dayjs"; import { useTaskInstanceServiceGetLog } from "openapi/queries"; -import type { TaskInstanceResponse } from "openapi/requests/types.gen"; +import type { + StructuredLogMessage, + TaskInstanceResponse, + TaskInstancesLogResponse, +} from "openapi/requests/types.gen"; import { isStatePending, useAutoRefresh } from "src/utils"; type Props = { @@ -29,32 +33,36 @@ type Props = { }; type ParseLogsProps = { - data: TaskInstanceResponse["content"]; + data: TaskInstancesLogResponse["content"]; }; -const renderStructuredLog = ({ event, level = undefined, timestamp = undefined, ...structured }, index) => { +const renderStructuredLog = (logMessage: string | StructuredLogMessage, index: number) => { + if (typeof logMessage === "string") { + return

{logMessage}

; + } + + const { event, level = undefined, timestamp, ...structured } = logMessage; const elements = []; if (Boolean(timestamp)) { - elements.push("[", , "] "); + elements.push("[", , "] "); } - if (Boolean(level)) { + if (typeof level === "string") { elements.push({level.toUpperCase()}, " - "); } elements.push({event}); for (const key in structured) { - if (!Object.hasOwn(structured, key)) { - continue; + if (Object.hasOwn(structured, key)) { + elements.push( + " ", + + {key}={JSON.stringify(structured[key])} + , + ); } - elements.push( - " ", - - {key}={JSON.stringify(structured[key])} - , - ); } return

{elements}

; @@ -62,20 +70,16 @@ const renderStructuredLog = ({ event, level = undefined, timestamp = undefined, // TODO: add support for log groups, colors, formats, filters const parseLogs = ({ data }: ParseLogsProps) => { - if (data === undefined) { - return {}; - } - let lines; - let warning; - let parsedLines; try { - parsedLines = data.map(renderStructuredLog); + parsedLines = data.map((datum, index) => renderStructuredLog(datum, index)); } catch (error) { + const errorMessage = error instanceof Error ? error.message : "An error occurred."; + // eslint-disable-next-line no-console - console.warn(`Error parsing logs: ${error}`); + console.warn(`Error parsing logs: ${errorMessage}`); warning = "Unable to show logs. There was an error parsing logs."; return { data, warning }; @@ -111,7 +115,7 @@ export const useLogs = ({ dagId, taskInstance, tryNumber = 1 }: Props) => { ); const parsedData = parseLogs({ - data: data?.content, + data: data?.content ?? [], }); return { data: parsedData, ...rest }; From 2303765e763ab7f215a49d1e15e0232b1e3effcb Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 19 Feb 2025 18:28:51 +0800 Subject: [PATCH 03/19] style: group metadata pop --- airflow/utils/log/log_reader.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index d82edf7756e95..d50b8fc126021 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -78,10 +78,10 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di """ if try_number is None: try_number = ti.try_number - metadata.pop("end_of_log", None) - metadata.pop("max_offset", None) - metadata.pop("offset", None) - metadata.pop("log_pos", None) + + for key in ("end_of_log", "max_offset", "offset", "log_pos"): + metadata.pop(key, None) + while True: logs, out_metadata = self.read_log_chunks(ti, try_number, metadata) # Update the metadata dict in place so caller can get new values/end-of-log etc. @@ -91,6 +91,7 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di # Optimize this so in stream mode we can just pass logs right through, or even better add # support to 307 redirect to a signed URL etc. yield (log if isinstance(log, str) else log.model_dump_json()) + "\n" + if not out_metadata.get("end_of_log", False) and ti.state not in ( TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED, From b79f72618f64b5752e3fdff1685ba519bd816489 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 19 Feb 2025 18:33:40 +0800 Subject: [PATCH 04/19] style: reduce if-else and directly use bool for assigning metadata["download_logs"] --- airflow/api_fastapi/core_api/routes/public/log.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/log.py b/airflow/api_fastapi/core_api/routes/public/log.py index 09ec4dc672961..1cb6bd9d66ef7 100644 --- a/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow/api_fastapi/core_api/routes/public/log.py @@ -92,10 +92,7 @@ def get_log( if metadata.get("download_logs") and metadata["download_logs"]: full_content = True - if full_content: - metadata["download_logs"] = True - else: - metadata["download_logs"] = False + metadata["download_logs"] = full_content task_log_reader = TaskLogReader() From 0e658278c6aff0cd947ca905976f1d46aa86e953 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 19 Feb 2025 19:52:14 +0800 Subject: [PATCH 05/19] style: improve type annotation --- airflow/utils/log/file_task_handler.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 9ba77278a447a..4cc938c4dc774 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -143,7 +143,7 @@ def _parse_log_lines(lines: Iterable[str]) -> Iterable[tuple[datetime | None, in yield timestamp, idx, log -def _interleave_logs(*logs) -> Iterable[StructuredLogMessage]: +def _interleave_logs(*logs: str) -> Iterable[StructuredLogMessage]: min_date = pendulum.datetime(2000, 1, 1) records = itertools.chain.from_iterable(_parse_log_lines(log.splitlines()) for log in logs) @@ -462,7 +462,10 @@ def _get_log_retrieval_url( ) def read( - self, task_instance, try_number: int | None = None, metadata=None + self, + task_instance: TaskInstance, + try_number: int | None = None, + metadata: dict[str, Any] | None = None, ) -> tuple[list[StructuredLogMessage] | str, dict[str, Any]]: """ Read logs of given task instance from local machine. From 67976c5cab65615dd35d48d7a7e6146d2f86846d Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 19 Feb 2025 22:51:27 +0800 Subject: [PATCH 06/19] test(test_log_reader): fix existing unit tests --- airflow/utils/log/file_task_handler.py | 2 +- tests/utils/log/test_log_reader.py | 97 ++++++++++++-------------- 2 files changed, 46 insertions(+), 53 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 4cc938c4dc774..3c8d7b32a9567 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -472,7 +472,7 @@ def read( :param task_instance: task instance object :param try_number: task instance try_number to read logs from. If None - it returns all logs separated by try_number + it returns the log of task_instance.try_number :param metadata: log metadata, can be used for steaming log reading and auto-tailing. :return: a list of listed tuples which order log string by host """ diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index 7d953cb73268d..ca888f4f113a1 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -126,101 +126,94 @@ def test_test_read_log_chunks_should_read_one_try(self): ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS logs, metadata = task_log_reader.read_log_chunks(ti=ti, try_number=1, metadata={}) - assert logs == [ - ( - " INFO - ::group::Log message source details\n" - "*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n" - " INFO - ::endgroup::\n" - "try_number=1.", - ) + + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == [ + f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log" ] - assert metadata == {"end_of_log": True, "log_pos": 13} + assert logs[1].event == "::endgroup::" + assert logs[2].event == "try_number=1." + assert metadata == {"end_of_log": True, "log_pos": 1} - def test_test_read_log_chunks_should_read_all_files(self): + def test_test_read_log_chunks_should_read_latest_files(self): task_log_reader = TaskLogReader() ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS - logs, metadatas = task_log_reader.read_log_chunks(ti=ti, try_number=None, metadata={}) + logs, metadata = task_log_reader.read_log_chunks(ti=ti, try_number=None, metadata={}) - for i in range(0, 3): - assert logs[i][0][0] == "localhost" - assert ( - "*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/{i + 1}.log\n" - ) in logs[i][0][1] - assert f"try_number={i + 1}." in logs[i][0][1] - assert metadatas == {"end_of_log": True, "log_pos": 13} + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == [ + f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log" + ] + assert logs[1].event == "::endgroup::" + assert logs[2].event == f"try_number={ti.try_number}." + assert metadata == {"end_of_log": True, "log_pos": 1} def test_test_test_read_log_stream_should_read_one_try(self): task_log_reader = TaskLogReader() ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS stream = task_log_reader.read_log_stream(ti=ti, try_number=1, metadata={}) + assert list(stream) == [ - "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n" - " INFO - ::endgroup::\ntry_number=1.\n" + "{" + '"event":"::group::Log message source details",' + f'"sources":["{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log"]' + "}\n", + '{"event":"::endgroup::"}\n', + '{"timestamp":null,"event":"try_number=1."}\n', ] - def test_test_test_read_log_stream_should_read_all_logs(self): + def test_test_test_read_log_stream_should_read_latest_logs(self): task_log_reader = TaskLogReader() self.ti.state = TaskInstanceState.SUCCESS # Ensure mocked instance is completed to return stream stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={}) + assert list(stream) == [ - "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n" - " INFO - ::endgroup::\ntry_number=1." - "\n", - "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n" - " INFO - ::endgroup::\ntry_number=2." - "\n", - "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n" - " INFO - ::endgroup::\ntry_number=3." - "\n", + "{" + '"event":"::group::Log message source details",' + f'"sources":["{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log"]' + "}\n", + '{"event":"::endgroup::"}\n', + '{"timestamp":null,"event":"try_number=3."}\n', ] @mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") def test_read_log_stream_should_support_multiple_chunks(self, mock_read): - first_return = ([[("", "1st line")]], [{}]) - second_return = ([[("", "2nd line")]], [{"end_of_log": False}]) - third_return = ([[("", "3rd line")]], [{"end_of_log": True}]) - fourth_return = ([[("", "should never be read")]], [{"end_of_log": True}]) + first_return = (["1st line"], {}) + second_return = (["2nd line"], {"end_of_log": False}) + third_return = (["3rd line"], {"end_of_log": True}) + fourth_return = (["should never be read"], {"end_of_log": True}) mock_read.side_effect = [first_return, second_return, third_return, fourth_return] task_log_reader = TaskLogReader() self.ti.state = TaskInstanceState.SUCCESS log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={}) - assert list(log_stream) == ["\n1st line\n", "\n2nd line\n", "\n3rd line\n"] + assert list(log_stream) == ["1st line\n", "2nd line\n", "3rd line\n"] + # as the metadata is now updated in place, when the latest run update metadata. + # the metadata stored in the mock_read will also be updated + # https://github.com/python/cpython/issues/77848 mock_read.assert_has_calls( [ - mock.call(self.ti, 1, metadata={}), - mock.call(self.ti, 1, metadata={}), - mock.call(self.ti, 1, metadata={"end_of_log": False}), + mock.call(self.ti, 1, metadata={"end_of_log": True}), + mock.call(self.ti, 1, metadata={"end_of_log": True}), + mock.call(self.ti, 1, metadata={"end_of_log": True}), ], any_order=False, ) @mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") def test_read_log_stream_should_read_each_try_in_turn(self, mock_read): - first_return = ([[("", "try_number=1.")]], [{"end_of_log": True}]) - second_return = ([[("", "try_number=2.")]], [{"end_of_log": True}]) - third_return = ([[("", "try_number=3.")]], [{"end_of_log": True}]) - fourth_return = ([[("", "should never be read")]], [{"end_of_log": True}]) - mock_read.side_effect = [first_return, second_return, third_return, fourth_return] + mock_read.side_effect = [(["try_number=3."], {"end_of_log": True})] task_log_reader = TaskLogReader() log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={}) - assert list(log_stream) == ["\ntry_number=1.\n", "\ntry_number=2.\n", "\ntry_number=3.\n"] + assert list(log_stream) == ["try_number=3.\n"] mock_read.assert_has_calls( [ - mock.call(self.ti, 1, metadata={}), - mock.call(self.ti, 2, metadata={}), - mock.call(self.ti, 3, metadata={}), + mock.call(self.ti, 3, metadata={"end_of_log": True}), ], any_order=False, ) From c511c120e0b4a13b85623615faa76b978e48b049 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 20 Feb 2025 16:56:59 +0800 Subject: [PATCH 07/19] test(api_fastapi): fix existing test_log unit tests --- .../core_api/routes/public/test_log.py | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_log.py b/tests/api_fastapi/core_api/routes/public/test_log.py index 2792dd776cf97..d82644553933b 100644 --- a/tests/api_fastapi/core_api/routes/public/test_log.py +++ b/tests/api_fastapi/core_api/routes/public/test_log.py @@ -165,12 +165,12 @@ def test_should_respond_200_json(self, try_number): ) expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log" log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - assert "[('localhost'," in response.json()["content"] - assert f"*** Found local files:\\n*** * {expected_filename}\\n" in response.json()["content"] - assert f"{log_content}')]" in response.json()["content"] - info = serializer.loads(response.json()["continuation_token"]) - assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1 else 18} + resp_contnt = response.json()["content"] + assert expected_filename in resp_contnt[0]["sources"] + assert log_content in resp_contnt[2]["event"] + + assert response.json()["continuation_token"] is None assert response.status_code == 200 @pytest.mark.parametrize( @@ -220,9 +220,10 @@ def test_should_respond_200_text_plain( assert response.status_code == 200 log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - assert "localhost\n" in response.content.decode("utf-8") - assert f"*** Found local files:\n*** * {expected_filename}\n" in response.content.decode("utf-8") - assert f"{log_content}\n" in response.content.decode("utf-8") + resp_content = response.content.decode("utf-8") + + assert expected_filename in resp_content + assert log_content in resp_content @pytest.mark.parametrize( "request_url, expected_filename, extra_query_string, try_number", @@ -275,9 +276,9 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu assert response.status_code == 200 log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - assert "localhost\n" in response.content.decode("utf-8") - assert f"*** Found local files:\n*** * {expected_filename}\n" in response.content.decode("utf-8") - assert f"{log_content}\n" in response.content.decode("utf-8") + resp_content = response.content.decode("utf-8") + assert expected_filename in resp_content + assert log_content in resp_content @pytest.mark.parametrize("try_number", [1, 2]) def test_get_logs_response_with_ti_equal_to_none(self, try_number): @@ -295,10 +296,10 @@ def test_get_logs_response_with_ti_equal_to_none(self, try_number): @pytest.mark.parametrize("try_number", [1, 2]) def test_get_logs_with_metadata_as_download_large_file(self, try_number): with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") as read_mock: - first_return = ([[("", "1st line")]], [{}]) - second_return = ([[("", "2nd line")]], [{"end_of_log": False}]) - third_return = ([[("", "3rd line")]], [{"end_of_log": True}]) - fourth_return = ([[("", "should never be read")]], [{"end_of_log": True}]) + first_return = (["", "1st line"], {}) + second_return = (["", "2nd line"], {"end_of_log": False}) + third_return = (["", "3rd line"], {"end_of_log": True}) + fourth_return = (["", "should never be read"], {"end_of_log": True}) read_mock.side_effect = [first_return, second_return, third_return, fourth_return] response = self.client.get( From eb11c6cf949fe23df771037b5bc1b9ca83b2201f Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 20 Feb 2025 17:35:14 +0800 Subject: [PATCH 08/19] feat(api_connexion/log): update v1 api to the latest log format --- .../api_connexion/endpoints/log_endpoint.py | 28 +++++++++++-------- airflow/api_connexion/openapi/v1.yaml | 5 +++- airflow/api_connexion/schemas/log_schema.py | 6 ++-- .../core_api/routes/public/test_log.py | 1 - 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/airflow/api_connexion/endpoints/log_endpoint.py b/airflow/api_connexion/endpoints/log_endpoint.py index 37138dc12be8e..29e96a20228ca 100644 --- a/airflow/api_connexion/endpoints/log_endpoint.py +++ b/airflow/api_connexion/endpoints/log_endpoint.py @@ -67,10 +67,7 @@ def get_log( if metadata.get("download_logs") and metadata["download_logs"]: full_content = True - if full_content: - metadata["download_logs"] = True - else: - metadata["download_logs"] = False + metadata["download_logs"] = full_content task_log_reader = TaskLogReader() @@ -116,11 +113,18 @@ def get_log( logs: Any if return_type == "application/json" or return_type is None: # default logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) - logs = logs[0] if task_try_number is not None else logs - # we must have token here, so we can safely ignore it - token = URLSafeSerializer(key).dumps(metadata) # type: ignore[assignment] - return logs_schema.dump(LogResponseObject(continuation_token=token, content=logs)) - # text/plain. Stream - logs = task_log_reader.read_log_stream(ti, task_try_number, metadata) - - return Response(logs, headers={"Content-Type": return_type}) + encoded_token = None + if not metadata.get("end_of_log", False): + encoded_token = URLSafeSerializer(key).dumps(metadata) + return logs_schema.dump(LogResponseObject(continuation_token=encoded_token, content=logs)) + + # text/plain, or something else we don't understand. Return raw log content + + # We need to exhaust the iterator before we can generate the continuation token. + # We could improve this by making it a streaming/async response, and by then setting the header using + # HTTP Trailers + logs = "".join(task_log_reader.read_log_stream(ti, task_try_number, metadata)) + headers = None + if not metadata.get("end_of_log", False): + headers = {"Airflow-Continuation-Token": URLSafeSerializer(key).dumps(metadata)} + return Response(mimetype="application/x-ndjson", response=logs, headers=headers) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index c63fb72f2a9a0..8b4d780d68a37 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -2145,8 +2145,11 @@ paths: properties: continuation_token: type: string + nullable: true content: - type: string + type: array + items: + type: string text/plain: schema: type: string diff --git a/airflow/api_connexion/schemas/log_schema.py b/airflow/api_connexion/schemas/log_schema.py index 6651057218e68..8416b03f145bc 100644 --- a/airflow/api_connexion/schemas/log_schema.py +++ b/airflow/api_connexion/schemas/log_schema.py @@ -24,14 +24,14 @@ class LogsSchema(Schema): """Schema for logs.""" - content = fields.Str(dump_only=True) - continuation_token = fields.Str(dump_only=True) + content = fields.List(fields.Str(dump_only=True)) + continuation_token = fields.Str(dump_only=True, allow_none=True) class LogResponseObject(NamedTuple): """Log Response Object.""" - content: str + content: list[str] continuation_token: str | None diff --git a/tests/api_fastapi/core_api/routes/public/test_log.py b/tests/api_fastapi/core_api/routes/public/test_log.py index d82644553933b..b9c43639539d6 100644 --- a/tests/api_fastapi/core_api/routes/public/test_log.py +++ b/tests/api_fastapi/core_api/routes/public/test_log.py @@ -165,7 +165,6 @@ def test_should_respond_200_json(self, try_number): ) expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log" log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - resp_contnt = response.json()["content"] assert expected_filename in resp_contnt[0]["sources"] assert log_content in resp_contnt[2]["event"] From c3d29fa86b9caff57f832ab3b763ae65470fe8ba Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 20 Feb 2025 21:24:02 +0800 Subject: [PATCH 09/19] test(providers/elasticsearch): fix part of the existing unit test --- .../elasticsearch/log/test_es_task_handler.py | 87 +++++++++++++++---- 1 file changed, 70 insertions(+), 17 deletions(-) diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index dc9602ee03650..7d304eb5434b5 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -209,12 +209,22 @@ def test_read(self, ti): ) assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert len(logs[0]) == 1 - assert self.test_message == logs[0][0][-1] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "1" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + + if AIRFLOW_V_3_0_PLUS: + assert len(logs[0]) == 2 + assert self.test_message == logs[0][-1] + + metadata = metadatas + else: + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert metadata["offset"] == "1" + assert timezone.parse(metadata[0]["last_log_timestamp"]) > ts def test_read_with_patterns(self, ti): ts = pendulum.now() @@ -251,7 +261,9 @@ def test_read_with_missing_index(self, ti): with mock.patch.object(self.es_task_handler, "index_patterns", new="nonexistent,test_*"): with pytest.raises(elasticsearch.exceptions.NotFoundError, match=r"IndexMissingException.*"): self.es_task_handler.read( - ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} + ti, + 1, + {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}, ) @pytest.mark.parametrize("seconds", [3, 6]) @@ -293,12 +305,25 @@ def test_read_with_match_phrase_query(self, ti): ) another_test_message = "another message" - another_body = {"message": another_test_message, "log_id": similar_log_id, "offset": 1} + another_body = { + "message": another_test_message, + "log_id": similar_log_id, + "offset": 1, + } self.es.index(index=self.index_name, doc_type=self.doc_type, body=another_body, id=1) - + self.es.index( + index=self.index_name, doc_type=self.doc_type, body=another_body, id=1 + ) ts = pendulum.now() logs, metadatas = self.es_task_handler.read( - ti, 1, {"offset": "0", "last_log_timestamp": str(ts), "end_of_log": False, "max_offset": 2} + ti, + 1, + { + "offset": "0", + "last_log_timestamp": str(ts), + "end_of_log": False, + "max_offset": 2, + }, ) assert len(logs) == 1 assert len(logs) == len(metadatas) @@ -390,7 +415,12 @@ def test_read_as_download_logs(self, ti): logs, metadatas = self.es_task_handler.read( ti, 1, - {"offset": 0, "last_log_timestamp": str(ts), "download_logs": True, "end_of_log": False}, + { + "offset": 0, + "last_log_timestamp": str(ts), + "download_logs": True, + "end_of_log": False, + }, ) assert len(logs) == 1 assert len(logs) == len(metadatas) @@ -498,7 +528,10 @@ def test_read_with_custom_offset_and_host_fields(self, ti): logs, _ = self.es_task_handler.read( ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert self.test_message == logs[0][0][1] + if AIRFLOW_V_3_0_PLUS: + pass + else: + assert self.test_message == logs[0][0][1] def test_close(self, ti): formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") @@ -578,14 +611,34 @@ def test_clean_date(self): "json_format, es_frontend, expected_url", [ # Common cases - (True, "localhost:5601/{log_id}", "https://localhost:5601/" + quote(JSON_LOG_ID)), - (False, "localhost:5601/{log_id}", "https://localhost:5601/" + quote(LOG_ID)), + ( + True, + "localhost:5601/{log_id}", + "https://localhost:5601/" + quote(JSON_LOG_ID), + ), + ( + False, + "localhost:5601/{log_id}", + "https://localhost:5601/" + quote(LOG_ID), + ), # Ignore template if "{log_id}"" is missing in the URL (False, "localhost:5601", "https://localhost:5601"), # scheme handling - (False, "https://localhost:5601/path/{log_id}", "https://localhost:5601/path/" + quote(LOG_ID)), - (False, "http://localhost:5601/path/{log_id}", "http://localhost:5601/path/" + quote(LOG_ID)), - (False, "other://localhost:5601/path/{log_id}", "other://localhost:5601/path/" + quote(LOG_ID)), + ( + False, + "https://localhost:5601/path/{log_id}", + "https://localhost:5601/path/" + quote(LOG_ID), + ), + ( + False, + "http://localhost:5601/path/{log_id}", + "http://localhost:5601/path/" + quote(LOG_ID), + ), + ( + False, + "other://localhost:5601/path/{log_id}", + "other://localhost:5601/path/" + quote(LOG_ID), + ), ], ) def test_get_external_log_url(self, ti, json_format, es_frontend, expected_url): From 35832766ef27ca691be1c780c30075fbf63db005 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 21 Feb 2025 14:10:08 +0800 Subject: [PATCH 10/19] test(providers/amazon): fix TestCloudwatchTaskHandler::test_read --- .../aws/log/test_cloudwatch_task_handler.py | 61 ++++++++++++++----- 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index 839b644f998a2..48ea1a3cb48fe 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -30,7 +30,9 @@ from airflow.models import DAG, DagRun, TaskInstance from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook -from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudwatchTaskHandler +from airflow.providers.amazon.aws.log.cloudwatch_task_handler import ( + CloudwatchTaskHandler, +) from airflow.providers.amazon.aws.utils import datetime_to_epoch_utc_ms from airflow.providers.standard.operators.empty import EmptyOperator from airflow.utils.state import State @@ -74,9 +76,19 @@ def setup_tests(self, create_log_template, tmp_path_factory, session): self.dag = DAG(dag_id=dag_id, schedule=None, start_date=date) task = EmptyOperator(task_id=task_id, dag=self.dag) if AIRFLOW_V_3_0_PLUS: - dag_run = DagRun(dag_id=self.dag.dag_id, logical_date=date, run_id="test", run_type="scheduled") + dag_run = DagRun( + dag_id=self.dag.dag_id, + logical_date=date, + run_id="test", + run_type="scheduled", + ) else: - dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="scheduled") + dag_run = DagRun( + dag_id=self.dag.dag_id, + execution_date=date, + run_id="test", + run_type="scheduled", + ) session.add(dag_run) session.commit() session.refresh(dag_run) @@ -124,8 +136,8 @@ def test_event_to_str(self): ] assert [handler._event_to_str(event) for event in events] == ( [ - f"[{get_time_str(current_time-2000)}] First", - f"[{get_time_str(current_time-1000)}] Second", + f"[{get_time_str(current_time - 2000)}] First", + f"[{get_time_str(current_time - 1000)}] Second", f"[{get_time_str(current_time)}] Third", ] ) @@ -150,21 +162,37 @@ def test_read(self): msg_template = "*** Reading remote log from Cloudwatch log_group: {} log_stream: {}.\n{}\n" events = "\n".join( [ - f"[{get_time_str(current_time-2000)}] First", - f"[{get_time_str(current_time-1000)}] Second", + f"[{get_time_str(current_time - 2000)}] First", + f"[{get_time_str(current_time - 1000)}] Second", f"[{get_time_str(current_time)}] Third", ] ) - assert self.cloudwatch_task_handler.read(self.ti) == ( - [[("", msg_template.format(self.remote_log_group, self.remote_log_stream, events))]], - [{"end_of_log": True}], - ) + if AIRFLOW_V_3_0_PLUS: + assert self.cloudwatch_task_handler.read(self.ti) == ( + msg_template.format(self.remote_log_group, self.remote_log_stream, events), + {"end_of_log": True}, + ) + else: + assert self.cloudwatch_task_handler.read(self.ti) == ( + [ + [ + ( + "", + msg_template.format(self.remote_log_group, self.remote_log_stream, events), + ) + ] + ], + [{"end_of_log": True}], + ) @pytest.mark.parametrize( "end_date, expected_end_time", [ (None, None), - (datetime(2020, 1, 2), datetime_to_epoch_utc_ms(datetime(2020, 1, 2) + timedelta(seconds=30))), + ( + datetime(2020, 1, 2), + datetime_to_epoch_utc_ms(datetime(2020, 1, 2) + timedelta(seconds=30)), + ), ], ) @mock.patch.object(AwsLogsHook, "get_log_events") @@ -191,7 +219,9 @@ def test_get_cloudwatch_logs(self, mock_get_log_events, end_date, expected_end_t id="json-serialize", ), pytest.param( - None, '{"datetime": "2023-01-01T00:00:00+00:00", "customObject": null}', id="not-set" + None, + '{"datetime": "2023-01-01T00:00:00+00:00", "customObject": null}', + id="not-set", ), ], ) @@ -219,7 +249,10 @@ def __repr__(self): "customObject": ToSerialize(), }, ) - with mock.patch("watchtower.threading.Thread"), mock.patch("watchtower.queue.Queue") as mq: + with ( + mock.patch("watchtower.threading.Thread"), + mock.patch("watchtower.queue.Queue") as mq, + ): mock_queue = Mock() mq.return_value = mock_queue handler.handle(message) From 43018ccdc54c4ae6faee787d1d7b77e68c09474f Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 21 Feb 2025 14:33:37 +0800 Subject: [PATCH 11/19] feat(providers/amazon): add airflow 3 compat logic --- .../amazon/aws/log/s3_task_handler.py | 27 ++++++++--- .../amazon/aws/log/test_s3_task_handler.py | 48 ++++++++++++++----- 2 files changed, 57 insertions(+), 18 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py index a1ba2d57ef8ae..121e865b30b69 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -26,6 +26,7 @@ from airflow.configuration import conf from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin @@ -58,7 +59,8 @@ def __init__(self, base_log_folder: str, s3_log_folder: str, **kwargs): def hook(self): """Returns S3Hook.""" return S3Hook( - aws_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID"), transfer_config_args={"use_threads": False} + aws_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID"), + transfer_config_args={"use_threads": False}, ) def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: @@ -116,12 +118,16 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l keys = self.hook.list_keys(bucket_name=bucket, prefix=prefix) if keys: keys = sorted(f"s3://{bucket}/{key}" for key in keys) - messages.append("Found logs in s3:") - messages.extend(f" * {key}" for key in keys) + if AIRFLOW_V_3_0_PLUS: + messages = keys + else: + messages.append("Found logs in s3:") + messages.extend(f" * {key}" for key in keys) for key in keys: logs.append(self.s3_read(key, return_error=True)) else: - messages.append(f"No logs found on s3 for ti={ti}") + if not AIRFLOW_V_3_0_PLUS: + messages.append(f"No logs found on s3 for ti={ti}") return messages, logs def s3_log_exists(self, remote_log_location: str) -> bool: @@ -152,7 +158,13 @@ def s3_read(self, remote_log_location: str, return_error: bool = False) -> str: return msg return "" - def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_retry: int = 1) -> bool: + def s3_write( + self, + log: str, + remote_log_location: str, + append: bool = True, + max_retry: int = 1, + ) -> bool: """ Write the log to the remote_log_location; return `True` or fails silently and return `False`. @@ -185,7 +197,10 @@ def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_ break except Exception: if try_num < max_retry: - self.log.warning("Failed attempt to write logs to %s, will retry", remote_log_location) + self.log.warning( + "Failed attempt to write logs to %s, will retry", + remote_log_location, + ) else: self.log.exception("Could not write logs to %s", remote_log_location) return False diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py index 7bf4110e90dfd..e44991ffe3058 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py @@ -62,9 +62,19 @@ def setup_tests(self, create_log_template, tmp_path_factory, session): self.dag = DAG("dag_for_testing_s3_task_handler", schedule=None, start_date=date) task = EmptyOperator(task_id="task_for_testing_s3_log_handler", dag=self.dag) if AIRFLOW_V_3_0_PLUS: - dag_run = DagRun(dag_id=self.dag.dag_id, logical_date=date, run_id="test", run_type="manual") + dag_run = DagRun( + dag_id=self.dag.dag_id, + logical_date=date, + run_id="test", + run_type="manual", + ) else: - dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="manual") + dag_run = DagRun( + dag_id=self.dag.dag_id, + execution_date=date, + run_id="test", + run_type="manual", + ) session.add(dag_run) session.commit() session.refresh(dag_run) @@ -131,22 +141,36 @@ def test_read(self): ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS log, metadata = self.s3_task_handler.read(ti) - actual = log[0][0][-1] - assert "*** Found logs in s3:\n*** * s3://bucket/remote/log/location/1.log\n" in actual - assert actual.endswith("Log line") - assert metadata == [{"end_of_log": True, "log_pos": 8}] + + expected_s3_uri = f"s3://bucket/{self.remote_log_key}" + + if AIRFLOW_V_3_0_PLUS: + assert log[0].event == "::group::Log message source details" + assert expected_s3_uri in log[0].sources + assert log[1].event == "::endgroup::" + assert log[2].event == "Log line" + assert metadata == {"end_of_log": True, "log_pos": 1} + else: + actual = log[0][0][-1] + assert f"*** Found logs in s3:\n*** * {expected_s3_uri}\n" in actual + assert actual.endswith("Log line") + assert metadata == [{"end_of_log": True, "log_pos": 8}] def test_read_when_s3_log_missing(self): ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS self.s3_task_handler._read_from_logs_server = mock.Mock(return_value=([], [])) log, metadata = self.s3_task_handler.read(ti) - assert len(log) == 1 - assert len(log) == len(metadata) - actual = log[0][0][-1] - expected = "*** No logs found on s3 for ti=\n" - assert expected in actual - assert metadata[0] == {"end_of_log": True, "log_pos": 0} + if AIRFLOW_V_3_0_PLUS: + assert len(log) == 2 + assert metadata == {"end_of_log": True, "log_pos": 0} + else: + assert len(log) == 1 + assert len(log) == len(metadata) + actual = log[0][0][-1] + expected = "*** No logs found on s3 for ti=\n" + assert expected in actual + assert metadata[0] == {"end_of_log": True, "log_pos": 0} def test_s3_read_when_log_missing(self): handler = self.s3_task_handler From 666564f58a84337b10ef41c3e08f1a5fc26291bc Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 21 Feb 2025 15:58:05 +0800 Subject: [PATCH 12/19] feat(providers/google): add airflow 3 task handler log handling logic --- .../google/cloud/log/gcs_task_handler.py | 16 +++-- .../google/cloud/log/test_gcs_task_handler.py | 59 +++++++++++++------ 2 files changed, 52 insertions(+), 23 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py index ba2cf4db27d97..45e45b3942b6f 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -31,9 +31,12 @@ from airflow.configuration import conf from airflow.exceptions import AirflowNotFoundException from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url -from airflow.providers.google.cloud.utils.credentials_provider import get_credentials_and_project_id +from airflow.providers.google.cloud.utils.credentials_provider import ( + get_credentials_and_project_id, +) from airflow.providers.google.common.consts import CLIENT_INFO from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin @@ -188,9 +191,13 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l if blobs: uris = [f"gs://{bucket}/{b.name}" for b in blobs] - messages.extend(["Found remote logs:", *[f" * {x}" for x in sorted(uris)]]) + if AIRFLOW_V_3_0_PLUS: + messages = uris + else: + messages.extend(["Found remote logs:", *[f" * {x}" for x in sorted(uris)]]) else: - messages.append(f"No logs found in GCS; ti=%s {ti}") + if not AIRFLOW_V_3_0_PLUS: + messages.append(f"No logs found in GCS; ti=%s {ti}") try: for key in sorted(uris): blob = storage.Blob.from_string(key, self.client) @@ -198,7 +205,8 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l if remote_log: logs.append(remote_log) except Exception as e: - messages.append(f"Unable to read remote log {e}") + if not AIRFLOW_V_3_0_PLUS: + messages.append(f"Unable to read remote log {e}") return messages, logs def gcs_write(self, log, remote_log_location) -> bool: diff --git a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py index 3d179e7de1b3f..b86a6fa2c2745 100644 --- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py @@ -30,6 +30,7 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS @pytest.mark.db_test @@ -67,11 +68,15 @@ def gcs_task_handler(self, create_log_template, local_log_location): @mock.patch("google.cloud.storage.Client") @mock.patch("airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id") @pytest.mark.parametrize( - "conn_id", [pytest.param("", id="no-conn"), pytest.param("my_gcs_conn", id="with-conn")] + "conn_id", + [pytest.param("", id="no-conn"), pytest.param("my_gcs_conn", id="with-conn")], ) def test_client_conn_id_behavior(self, mock_get_cred, mock_client, mock_hook, conn_id): """When remote log conn id configured, hook will be used""" - mock_hook.return_value.get_credentials_and_project_id.return_value = ("test_cred", "test_proj") + mock_hook.return_value.get_credentials_and_project_id.return_value = ( + "test_cred", + "test_proj", + ) mock_get_cred.return_value = ("test_cred", "test_proj") with conf_vars({("logging", "remote_log_conn_id"): conn_id}): return_value = self.gcs_task_handler.client @@ -104,12 +109,20 @@ def test_should_read_logs_from_remote(self, mock_blob, mock_client, mock_creds, session.add(ti) session.commit() logs, metadata = self.gcs_task_handler._read(ti, self.ti.try_number) - mock_blob.from_string.assert_called_once_with( - "gs://bucket/remote/log/location/1.log", mock_client.return_value - ) - assert "*** Found remote logs:\n*** * gs://bucket/remote/log/location/1.log\n" in logs - assert logs.endswith("CONTENT") - assert metadata == {"end_of_log": True, "log_pos": 7} + expected_gs_uri = f"gs://bucket/{mock_obj.name}" + + mock_blob.from_string.assert_called_once_with(expected_gs_uri, mock_client.return_value) + + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == [expected_gs_uri] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "CONTENT" + assert metadata == {"end_of_log": True, "log_pos": 1} + else: + assert f"*** Found remote logs:\n*** * {expected_gs_uri}\n" in logs + assert logs.endswith("CONTENT") + assert metadata == {"end_of_log": True, "log_pos": 7} @mock.patch( "airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id", @@ -127,18 +140,26 @@ def test_should_read_from_local_on_logs_read_error(self, mock_blob, mock_client, ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS log, metadata = self.gcs_task_handler._read(ti, self.ti.try_number) + expected_gs_uri = f"gs://bucket/{mock_obj.name}" - assert ( - "*** Found remote logs:\n" - "*** * gs://bucket/remote/log/location/1.log\n" - "*** Unable to read remote log Failed to connect\n" - "*** Found local files:\n" - f"*** * {self.gcs_task_handler.local_base}/1.log\n" - ) in log - assert metadata == {"end_of_log": True, "log_pos": 0} - mock_blob.from_string.assert_called_once_with( - "gs://bucket/remote/log/location/1.log", mock_client.return_value - ) + if AIRFLOW_V_3_0_PLUS: + assert log[0].event == "::group::Log message source details" + assert log[0].sources == [ + expected_gs_uri, + f"{self.gcs_task_handler.local_base}/1.log", + ] + assert log[1].event == "::endgroup::" + assert metadata == {"end_of_log": True, "log_pos": 0} + else: + assert ( + "*** Found remote logs:\n" + "*** * gs://bucket/remote/log/location/1.log\n" + "*** Unable to read remote log Failed to connect\n" + "*** Found local files:\n" + f"*** * {self.gcs_task_handler.local_base}/1.log\n" + ) in log + assert metadata == {"end_of_log": True, "log_pos": 0} + mock_blob.from_string.assert_called_once_with(expected_gs_uri, mock_client.return_value) @mock.patch( "airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id", From 4d1bd54bcab11a829c81a0b3d980a3b267d052c9 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 21 Feb 2025 17:19:28 +0800 Subject: [PATCH 13/19] feat(providers/elasticsearch): add airflow 3 task handler log handling logic --- .../elasticsearch/log/es_task_handler.py | 42 ++- .../elasticsearch/log/test_es_task_handler.py | 293 ++++++++++++------ .../opensearch/log/os_task_handler.py | 12 + 3 files changed, 253 insertions(+), 94 deletions(-) diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index 15904e7ebf3b4..63b1426c116c3 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -40,7 +40,9 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.models.dagrun import DagRun -from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter +from airflow.providers.elasticsearch.log.es_json_formatter import ( + ElasticsearchJSONFormatter, +) from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse, Hit from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils import timezone @@ -54,10 +56,18 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceKey +if AIRFLOW_V_3_0_PLUS: + from typing import Union + + from airflow.utils.log.file_task_handler import StructuredLogMessage + + EsLogMsgType = Union[list[StructuredLogMessage], str] +else: + EsLogMsgType = list[tuple[str, str]] # type: ignore[misc] + LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""} # Elasticsearch hosted log type -EsLogMsgType = list[tuple[str, str]] # Compatibility: Airflow 2.3.3 and up uses this method, which accesses the # LogTemplate model to record the log ID template used. If this function does @@ -344,7 +354,10 @@ def _read( "If your task started recently, please wait a moment and reload this page. " "Otherwise, the logs for this task instance may have been removed." ) - return [("", missing_log_message)], metadata + if AIRFLOW_V_3_0_PLUS: + return missing_log_message, metadata + else: + return [("", missing_log_message)], metadata # type: ignore[list-item] if ( # Assume end of log after not receiving new log for N min, cur_ts.diff(last_log_ts).in_minutes() >= 5 @@ -358,12 +371,31 @@ def _read( # If we hit the end of the log, remove the actual end_of_log message # to prevent it from showing in the UI. - def concat_logs(hits: list[Hit]): + def concat_logs(hits: list[Hit]) -> str: log_range = (len(hits) - 1) if hits[-1].message == self.end_of_log_mark else len(hits) return "\n".join(self._format_msg(hits[i]) for i in range(log_range)) if logs_by_host: - message = [(host, concat_logs(hits)) for host, hits in logs_by_host.items()] + if AIRFLOW_V_3_0_PLUS: + from airflow.utils.log.file_task_handler import StructuredLogMessage + + header = [ + StructuredLogMessage.model_construct( + event="::group::Log message source details", + sources=[host for host in logs_by_host.keys()], + ), + StructuredLogMessage.model_construct(event="::endgroup::"), + ] # type: ignore[misc] + + message = header + [ + StructuredLogMessage.model_construct(event=concat_logs(hits)) + for hits in logs_by_host.values() + ] # type: ignore[misc] + else: + message = [ + (host, concat_logs(hits)) # type: ignore[misc] + for host, hits in logs_by_host.items() + ] else: message = [] return message, metadata diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index 7d304eb5434b5..43f63f86d888a 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -208,14 +208,15 @@ def test_read(self, ti): ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - if AIRFLOW_V_3_0_PLUS: - assert len(logs[0]) == 2 - assert self.test_message == logs[0][-1] + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" metadata = metadatas else: + assert len(logs) == 1 assert len(logs) == len(metadatas) assert len(logs[0]) == 1 assert self.test_message == logs[0][0][-1] @@ -224,7 +225,7 @@ def test_read(self, ti): assert not metadata["end_of_log"] assert metadata["offset"] == "1" - assert timezone.parse(metadata[0]["last_log_timestamp"]) > ts + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_with_patterns(self, ti): ts = pendulum.now() @@ -233,13 +234,24 @@ def test_read_with_patterns(self, ti): ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert len(logs[0]) == 1 - assert self.test_message == logs[0][0][-1] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "1" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert metadata["offset"] == "1" + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_with_patterns_no_match(self, ti): ts = pendulum.now() @@ -248,13 +260,21 @@ def test_read_with_patterns_no_match(self, ti): ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "0" + if AIRFLOW_V_3_0_PLUS: + assert logs == [] + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert metadata["offset"] == "0" + assert not metadata["end_of_log"] # last_log_timestamp won't change if no log lines read. - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + assert timezone.parse(metadata["last_log_timestamp"]) == ts def test_read_with_missing_index(self, ti): ts = pendulum.now() @@ -280,23 +300,35 @@ def test_read_missing_logs(self, seconds, create_task_instance): ) ts = pendulum.now().add(seconds=-seconds) logs, metadatas = self.es_task_handler.read(ti, 1, {"offset": 0, "last_log_timestamp": str(ts)}) - - assert len(logs) == 1 - if seconds > 5: - # we expect a log not found message when checking began more than 5 seconds ago - assert len(logs[0]) == 1 - actual_message = logs[0][0][1] - expected_pattern = r"^\*\*\* Log .* not found in Elasticsearch.*" - assert re.match(expected_pattern, actual_message) is not None - assert metadatas[0]["end_of_log"] is True + if AIRFLOW_V_3_0_PLUS: + if seconds > 5: + # we expect a log not found message when checking began more than 5 seconds ago + expected_pattern = r"^\*\*\* Log .* not found in Elasticsearch.*" + assert re.match(expected_pattern, logs) is not None + assert metadatas["end_of_log"] is True + else: + # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message + assert logs == [] + assert metadatas["end_of_log"] is False + assert metadatas["offset"] == "0" + assert timezone.parse(metadatas["last_log_timestamp"]) == ts else: - # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message - assert len(logs[0]) == 0 - assert logs == [[]] - assert metadatas[0]["end_of_log"] is False - assert len(logs) == len(metadatas) - assert metadatas[0]["offset"] == "0" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + assert len(logs) == 1 + if seconds > 5: + # we expect a log not found message when checking began more than 5 seconds ago + assert len(logs[0]) == 1 + actual_message = logs[0][0][1] + expected_pattern = r"^\*\*\* Log .* not found in Elasticsearch.*" + assert re.match(expected_pattern, actual_message) is not None + assert metadatas[0]["end_of_log"] is True + else: + # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message + assert len(logs[0]) == 0 + assert logs == [[]] + assert metadatas[0]["end_of_log"] is False + assert len(logs) == len(metadatas) + assert metadatas[0]["offset"] == "0" + assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts def test_read_with_match_phrase_query(self, ti): similar_log_id = ( @@ -311,9 +343,7 @@ def test_read_with_match_phrase_query(self, ti): "offset": 1, } self.es.index(index=self.index_name, doc_type=self.doc_type, body=another_body, id=1) - self.es.index( - index=self.index_name, doc_type=self.doc_type, body=another_body, id=1 - ) + ts = pendulum.now() logs, metadatas = self.es_task_handler.read( ti, @@ -325,23 +355,45 @@ def test_read_with_match_phrase_query(self, ti): "max_offset": 2, }, ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert self.test_message == logs[0][0][-1] - assert another_test_message != logs[0] + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "1" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert metadata["offset"] == "1" + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_with_none_metadata(self, ti): logs, metadatas = self.es_task_handler.read(ti, 1) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert self.test_message == logs[0][0][-1] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "1" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) < pendulum.now() + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert metadata["offset"] == "1" + assert timezone.parse(metadata["last_log_timestamp"]) < pendulum.now() def test_read_nonexistent_log(self, ti): ts = pendulum.now() @@ -352,39 +404,67 @@ def test_read_nonexistent_log(self, ti): logs, metadatas = self.es_task_handler.read( ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "0" + if AIRFLOW_V_3_0_PLUS: + assert logs == [] + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert metadata["offset"] == "0" + assert not metadata["end_of_log"] # last_log_timestamp won't change if no log lines read. - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + assert timezone.parse(metadata["last_log_timestamp"]) == ts def test_read_with_empty_metadata(self, ti): ts = pendulum.now() logs, metadatas = self.es_task_handler.read(ti, 1, {}) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert self.test_message == logs[0][0][-1] - assert not metadatas[0]["end_of_log"] + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] # offset should be initialized to 0 if not provided. - assert metadatas[0]["offset"] == "1" + assert metadata["offset"] == "1" # last_log_timestamp will be initialized using log reading time # if not last_log_timestamp is provided. - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + assert timezone.parse(metadata["last_log_timestamp"]) > ts # case where offset is missing but metadata not empty. self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1) logs, metadatas = self.es_task_handler.read(ti, 1, {"end_of_log": False}) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert not metadatas[0]["end_of_log"] + if AIRFLOW_V_3_0_PLUS: + assert logs == [] + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] # offset should be initialized to 0 if not provided. - assert metadatas[0]["offset"] == "0" + assert metadata["offset"] == "0" # last_log_timestamp will be initialized using log reading time # if not last_log_timestamp is provided. - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_timeout(self, ti): ts = pendulum.now().subtract(minutes=5) @@ -403,12 +483,20 @@ def test_read_timeout(self, ti): "end_of_log": False, }, ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert metadatas[0]["end_of_log"] - assert str(offset) == metadatas[0]["offset"] - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + if AIRFLOW_V_3_0_PLUS: + assert logs == [] + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert metadata["end_of_log"] + assert str(offset) == metadata["offset"] + assert timezone.parse(metadata["last_log_timestamp"]) == ts def test_read_as_download_logs(self, ti): ts = pendulum.now() @@ -422,14 +510,24 @@ def test_read_as_download_logs(self, ti): "end_of_log": False, }, ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert len(logs[0]) == 1 - assert self.test_message == logs[0][0][-1] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["download_logs"] - assert metadatas[0]["offset"] == "1" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert metadata["offset"] == "1" + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_raises(self, ti): with mock.patch.object(self.es_task_handler.log, "exception") as mock_exception: @@ -439,11 +537,20 @@ def test_read_raises(self, ti): assert mock_exception.call_count == 1 args, kwargs = mock_exception.call_args assert "Could not read log with log_id:" in args[0] - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "0" + + if AIRFLOW_V_3_0_PLUS: + assert logs == [] + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert metadata["offset"] == "0" + assert not metadata["end_of_log"] def test_set_context(self, ti): self.es_task_handler.set_context(ti) @@ -479,7 +586,11 @@ def test_read_with_json_format(self, ti): logs, _ = self.es_task_handler.read( ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert logs[0][0][1] == "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " + expected_message = "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " + if AIRFLOW_V_3_0_PLUS: + assert logs[2].event == expected_message + else: + assert logs[0][0][1] == expected_message def test_read_with_json_format_with_custom_offset_and_host_fields(self, ti): ts = pendulum.now() @@ -507,7 +618,11 @@ def test_read_with_json_format_with_custom_offset_and_host_fields(self, ti): logs, _ = self.es_task_handler.read( ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert logs[0][0][1] == "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " + expected_message = "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " + if AIRFLOW_V_3_0_PLUS: + assert logs[2].event == expected_message + else: + assert logs[0][0][1] == expected_message def test_read_with_custom_offset_and_host_fields(self, ti): ts = pendulum.now() diff --git a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py index e1a0a083e7291..6eb7704cd9b63 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py +++ b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py @@ -44,6 +44,18 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance, TaskInstanceKey + + +if AIRFLOW_V_3_0_PLUS: + from typing import Union + + from airflow.utils.log.file_task_handler import StructuredLogMessage + + OsLogMsgType = Union[list[StructuredLogMessage], str] +else: + OsLogMsgType = list[tuple[str, str]] # type: ignore[misc] + + USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template") OsLogMsgType = list[tuple[str, str]] LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""} From b6bec70615b0cf1da7d5a99f7b6b94f8e19bfce7 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 21 Feb 2025 18:20:57 +0800 Subject: [PATCH 14/19] feat(providers/microsoft): add airflow 3 task handler log handling logic --- .../microsoft/azure/log/wasb_task_handler.py | 9 +++-- .../microsoft/azure/version_compat.py | 36 +++++++++++++++++++ .../azure/log/test_wasb_task_handler.py | 36 ++++++++++++++----- 3 files changed, 71 insertions(+), 10 deletions(-) create mode 100644 providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py index d32a3463d74cd..8dc5e0ac174d6 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -26,6 +26,7 @@ from azure.core.exceptions import HttpResponseError from airflow.configuration import conf +from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin @@ -137,9 +138,13 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l if blob_names: uris = [f"https://{self.wasb_container}.blob.core.windows.net/{b}" for b in blob_names] - messages.extend(["Found remote logs:", *[f" * {x}" for x in sorted(uris)]]) + if AIRFLOW_V_3_0_PLUS: + messages = uris + else: + messages.extend(["Found remote logs:", *[f" * {x}" for x in sorted(uris)]]) else: - messages.append(f"No logs found in WASB; ti=%s {ti}") + if not AIRFLOW_V_3_0_PLUS: + messages.append(f"No logs found in WASB; ti=%s {ti}") for name in sorted(blob_names): remote_log = "" diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py new file mode 100644 index 0000000000000..21e7170194e36 --- /dev/null +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py index 73084f2a9a7ba..8f48c96002870 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py @@ -32,6 +32,7 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test @@ -112,13 +113,29 @@ def test_wasb_read(self, mock_hook_cls, ti): assert self.wasb_task_handler.wasb_read(self.remote_log_location) == "Log line" ti = copy.copy(ti) ti.state = TaskInstanceState.SUCCESS - assert self.wasb_task_handler.read(ti)[0][0][0][0] == "localhost" - assert ( - "*** Found remote logs:\n*** * https://wasb-container.blob.core.windows.net/abc/hello.log\n" - in self.wasb_task_handler.read(ti)[0][0][0][1] - ) - assert "Log line" in self.wasb_task_handler.read(ti)[0][0][0][1] - assert self.wasb_task_handler.read(ti)[1][0] == {"end_of_log": True, "log_pos": 8} + + logs, metadata = self.wasb_task_handler.read(ti) + + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["https://wasb-container.blob.core.windows.net/abc/hello.log"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "Log line" + assert metadata == { + "end_of_log": True, + "log_pos": 1, + } + else: + assert logs[0][0][0] == "localhost" + assert ( + "*** Found remote logs:\n*** * https://wasb-container.blob.core.windows.net/abc/hello.log\n" + in logs[0][0][1] + ) + assert "Log line" in logs[0][0][1] + assert metadata[0] == { + "end_of_log": True, + "log_pos": 8, + } @mock.patch( "airflow.providers.microsoft.azure.hooks.wasb.WasbHook", @@ -152,7 +169,10 @@ def test_write_on_existing_log(self, mock_log_exists, mock_wasb_read, mock_hook) mock_wasb_read.return_value = "old log" self.wasb_task_handler.wasb_write("text", self.remote_log_location) mock_hook.return_value.load_string.assert_called_once_with( - "old log\ntext", self.container_name, self.remote_log_location, overwrite=True + "old log\ntext", + self.container_name, + self.remote_log_location, + overwrite=True, ) @mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") From 79159a69a581b3a70728e45c484e3a5053b01047 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 21 Feb 2025 18:27:49 +0800 Subject: [PATCH 15/19] feat(providers/redis): add airflow 3 task handler log handling logic --- .../providers/redis/log/redis_task_handler.py | 3 ++ .../airflow/providers/redis/version_compat.py | 36 +++++++++++++++++++ .../unit/redis/log/test_redis_task_handler.py | 12 +++++-- 3 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 providers/redis/src/airflow/providers/redis/version_compat.py diff --git a/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py b/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py index 7107b2ab3a421..d1fd1cc8de0d6 100644 --- a/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py +++ b/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py @@ -23,6 +23,7 @@ from airflow.configuration import conf from airflow.providers.redis.hooks.redis import RedisHook +from airflow.providers.redis.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin @@ -79,6 +80,8 @@ def _read( log_str = b"\n".join( self.conn.lrange(self._render_filename(ti, try_number), start=0, end=-1) ).decode() + if AIRFLOW_V_3_0_PLUS: + log_str = [log_str] # type: ignore[assignment] return log_str, {"end_of_log": True} def set_context(self, ti: TaskInstance, **kwargs) -> None: diff --git a/providers/redis/src/airflow/providers/redis/version_compat.py b/providers/redis/src/airflow/providers/redis/version_compat.py new file mode 100644 index 0000000000000..21e7170194e36 --- /dev/null +++ b/providers/redis/src/airflow/providers/redis/version_compat.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/redis/tests/unit/redis/log/test_redis_task_handler.py b/providers/redis/tests/unit/redis/log/test_redis_task_handler.py index 8d3081b028057..ac3c3a4078ea9 100644 --- a/providers/redis/tests/unit/redis/log/test_redis_task_handler.py +++ b/providers/redis/tests/unit/redis/log/test_redis_task_handler.py @@ -51,7 +51,12 @@ def ti(self): run_type="scheduled", ) else: - dag_run = DagRun(dag_id=dag.dag_id, execution_date=date, run_id="test", run_type="scheduled") + dag_run = DagRun( + dag_id=dag.dag_id, + execution_date=date, + run_id="test", + run_type="scheduled", + ) dag_run.set_state(State.RUNNING) with create_session() as session: @@ -105,5 +110,8 @@ def test_read(self, ti): lrange.return_value = [b"Line 1", b"Line 2"] logs = handler.read(ti) - assert logs == ([[("", "Line 1\nLine 2")]], [{"end_of_log": True}]) + if AIRFLOW_V_3_0_PLUS: + assert logs == (["Line 1\nLine 2"], {"end_of_log": True}) + else: + assert logs == ([[("", "Line 1\nLine 2")]], [{"end_of_log": True}]) lrange.assert_called_once_with(key, start=0, end=-1) From 1335623d72e06f6e96d80bd6c268089038b0d4bb Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 21 Feb 2025 18:40:29 +0800 Subject: [PATCH 16/19] feat(providers/opensearch): add airflow 3 task handler log handling logic --- .../opensearch/log/os_task_handler.py | 21 ++- .../opensearch/log/test_os_task_handler.py | 162 +++++++++++++----- 2 files changed, 134 insertions(+), 49 deletions(-) diff --git a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py index 6eb7704cd9b63..159f4f1f2bf14 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py +++ b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py @@ -57,7 +57,6 @@ USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template") -OsLogMsgType = list[tuple[str, str]] LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""} @@ -387,7 +386,7 @@ def _read( "If your task started recently, please wait a moment and reload this page. " "Otherwise, the logs for this task instance may have been removed." ) - return [("", missing_log_message)], metadata + return [("", missing_log_message)], metadata # type: ignore[list-item] if ( # Assume end of log after not receiving new log for N min, cur_ts.diff(last_log_ts).in_minutes() >= 5 @@ -406,7 +405,23 @@ def concat_logs(hits: list[Hit]): return "\n".join(self._format_msg(hits[i]) for i in range(log_range)) if logs_by_host: - message = [(host, concat_logs(hits)) for host, hits in logs_by_host.items()] + if AIRFLOW_V_3_0_PLUS: + from airflow.utils.log.file_task_handler import StructuredLogMessage + + header = [ + StructuredLogMessage.model_construct( + event="::group::Log message source details", + sources=[host for host in logs_by_host.keys()], + ), + StructuredLogMessage.model_construct(event="::endgroup::"), + ] + + message = header + [ + StructuredLogMessage.model_construct(event=concat_logs(hits)) + for hits in logs_by_host.values() + ] + else: + message = [(host, concat_logs(hits)) for host, hits in logs_by_host.items()] # type: ignore[misc] else: message = [] return message, metadata diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py index 475a325a242e3..6e7db1527189a 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py @@ -80,7 +80,10 @@ def ti(self, create_task_instance, create_log_template): if AIRFLOW_V_3_0_PLUS: create_log_template(self.FILENAME_TEMPLATE, "{dag_id}-{task_id}-{logical_date}-{try_number}") else: - create_log_template(self.FILENAME_TEMPLATE, "{dag_id}-{task_id}-{execution_date}-{try_number}") + create_log_template( + self.FILENAME_TEMPLATE, + "{dag_id}-{task_id}-{execution_date}-{try_number}", + ) yield get_ti( dag_id=self.DAG_ID, task_id=self.TASK_ID, @@ -195,17 +198,28 @@ def test_read(self, ti): ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert len(logs[0]) == 1 - assert ( - logs[0][0][-1] == "Dependencies all met for dep_context=non-requeueable" + expected_msg = ( + "Dependencies all met for dep_context=non-requeueable" " deps ti=\n" "Starting attempt 1 of 1\nExecuting " "on 2023-07-09 07:47:32+00:00" ) - assert not metadatas[0]["end_of_log"] - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == expected_msg + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert logs[0][0][-1] == expected_msg + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_with_patterns(self, ti): ts = pendulum.now() @@ -214,17 +228,28 @@ def test_read_with_patterns(self, ti): ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert len(logs[0]) == 1 - assert ( - logs[0][0][-1] == "Dependencies all met for dep_context=non-requeueable" + expected_msg = ( + "Dependencies all met for dep_context=non-requeueable" " deps ti=\n" "Starting attempt 1 of 1\nExecuting " "on 2023-07-09 07:47:32+00:00" ) - assert not metadatas[0]["end_of_log"] - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == expected_msg + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert logs[0][0][-1] == expected_msg + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_with_patterns_no_match(self, ti): ts = pendulum.now() @@ -240,26 +265,43 @@ def test_read_with_patterns_no_match(self, ti): }, ): logs, metadatas = self.os_task_handler.read( - ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} + ti, + 1, + {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}, ) + if AIRFLOW_V_3_0_PLUS: + assert logs == [] - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "0" + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert metadata["offset"] == "0" # last_log_timestamp won't change if no log lines read. - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + assert timezone.parse(metadata["last_log_timestamp"]) == ts def test_read_with_missing_index(self, ti): ts = pendulum.now() with mock.patch.object(self.os_task_handler, "index_patterns", new="nonexistent,test_*"): with mock.patch.object( - self.os_task_handler.client, "count", side_effect=NotFoundError(404, "IndexNotFoundError") + self.os_task_handler.client, + "count", + side_effect=NotFoundError(404, "IndexNotFoundError"), ): with pytest.raises(NotFoundError, match=r"IndexNotFoundError"): self.os_task_handler.read( - ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} + ti, + 1, + { + "offset": 0, + "last_log_timestamp": str(ts), + "end_of_log": False, + }, ) @pytest.mark.parametrize("seconds", [3, 6]) @@ -286,36 +328,64 @@ def test_read_missing_logs(self, seconds, create_task_instance): }, ): logs, metadatas = self.os_task_handler.read(ti, 1, {"offset": 0, "last_log_timestamp": str(ts)}) - - assert len(logs) == 1 - if seconds > 5: - # we expect a log not found message when checking began more than 5 seconds ago - assert len(logs[0]) == 1 - actual_message = logs[0][0][1] - expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*" - assert re.match(expected_pattern, actual_message) is not None - assert metadatas[0]["end_of_log"] is True + if AIRFLOW_V_3_0_PLUS: + if seconds > 5: + # we expect a log not found message when checking began more than 5 seconds ago + assert len(logs[0]) == 2 + actual_message = logs[0][1] + expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*" + assert re.match(expected_pattern, actual_message) is not None + assert metadatas["end_of_log"] is True + else: + # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message + assert logs == [] + assert metadatas["end_of_log"] is False + assert metadatas["offset"] == "0" + assert timezone.parse(metadatas["last_log_timestamp"]) == ts else: - # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message - assert len(logs[0]) == 0 - assert logs == [[]] - assert metadatas[0]["end_of_log"] is False - assert len(logs) == len(metadatas) - assert metadatas[0]["offset"] == "0" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + assert len(logs) == 1 + if seconds > 5: + # we expect a log not found message when checking began more than 5 seconds ago + assert len(logs[0]) == 1 + actual_message = logs[0][0][1] + expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*" + assert re.match(expected_pattern, actual_message) is not None + assert metadatas[0]["end_of_log"] is True + else: + # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message + assert len(logs[0]) == 0 + assert logs == [[]] + assert metadatas[0]["end_of_log"] is False + assert len(logs) == len(metadatas) + assert metadatas[0]["offset"] == "0" + assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts def test_read_with_none_metadata(self, ti): logs, metadatas = self.os_task_handler.read(ti, 1) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert ( - logs[0][0][-1] == "Dependencies all met for dep_context=non-requeueable" + + expected_message = ( + "Dependencies all met for dep_context=non-requeueable" " deps ti=\n" "Starting attempt 1 of 1\nExecuting " "on 2023-07-09 07:47:32+00:00" ) - assert not metadatas[0]["end_of_log"] - assert timezone.parse(metadatas[0]["last_log_timestamp"]) < pendulum.now() + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == expected_message + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert logs[0][0][-1] == expected_message + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert timezone.parse(metadata["last_log_timestamp"]) < pendulum.now() def test_set_context(self, ti): self.os_task_handler.set_context(ti) From d490b9a2c63031b376edc4c13ef628d4be582089 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 21 Feb 2025 19:00:36 +0800 Subject: [PATCH 17/19] test: ignore unneeded tests --- tests/always/test_project_structure.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 2aeb00f83491d..548679757104d 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -183,9 +183,11 @@ def test_providers_modules_should_have_tests(self): "providers/google/tests/unit/google/test_version_compat.py", "providers/http/tests/unit/http/test_exceptions.py", "providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls.py", + "providers/microsoft/azure/tests/unit/microsoft/azure/test_version_compat.py", "providers/openlineage/tests/unit/openlineage/test_version_compat.py", "providers/opensearch/tests/unit/opensearch/test_version_compat.py", "providers/presto/tests/unit/presto/test_version_compat.py", + "providers/redis/tests/unit/redis/test_version_compat.py", "providers/snowflake/tests/unit/snowflake/triggers/test_snowflake_trigger.py", "providers/standard/tests/unit/standard/operators/test_empty.py", "providers/standard/tests/unit/standard/operators/test_latest_only.py", @@ -558,8 +560,7 @@ class TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest "GoogleCampaignManagerReportSensor", "airflow.providers.google.marketing_platform.sensors.display_video." "GoogleDisplayVideo360GetSDFDownloadOperationSensor", - "airflow.providers.google.marketing_platform.sensors.display_video." - "GoogleDisplayVideo360ReportSensor", + "airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360ReportSensor", } @pytest.mark.xfail(reason="We did not reach full coverage yet") From 9a5aafa5092c80fd55b106a8bb5da6e1a0fb9930 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 21 Feb 2025 23:01:32 +0800 Subject: [PATCH 18/19] test(log_handlers): fix pendulum.tz version imcompat --- tests/utils/test_log_handlers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index d760f18a63ebe..92200faced365 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -669,7 +669,8 @@ def test_interleave_interleaves(): ] ) - tz = pendulum.tz.FixedTimezone(-28800, name="-08:00") + # -08:00 + tz = pendulum.tz.fixed_timezone(-28800) DateTime = pendulum.DateTime expected = [ { From d3e2dff3716f610e2b63e924ebfae54493d4b95b Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 26 Feb 2025 18:29:04 +0800 Subject: [PATCH 19/19] feat: force StructuredLogMessage check when initialing --- airflow/utils/log/file_task_handler.py | 12 +++++------- .../providers/elasticsearch/log/es_task_handler.py | 9 ++++----- .../providers/opensearch/log/os_task_handler.py | 9 ++++----- tests/utils/log/test_log_reader.py | 8 ++++---- tests/utils/test_log_handlers.py | 8 +++++++- 5 files changed, 24 insertions(+), 22 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 3c8d7b32a9567..c5798c1e49599 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -53,7 +53,7 @@ class StructuredLogMessage(BaseModel): """An individual log message.""" - timestamp: datetime | None + timestamp: datetime | None = None event: str # We don't need to cache string when parsing in to this, as almost every line will have a different @@ -136,7 +136,7 @@ def _parse_log_lines(lines: Iterable[str]) -> Iterable[tuple[datetime | None, in with suppress(Exception): # If we can't parse the timestamp, don't attach one to the row next_timestamp = _parse_timestamp(line) - log = StructuredLogMessage.model_construct(event=line, timestamp=next_timestamp) + log = StructuredLogMessage(event=line, timestamp=next_timestamp) if log.timestamp: log.timestamp = coerce_datetime(log.timestamp) timestamp = log.timestamp @@ -414,10 +414,8 @@ def _read( # Log message source details are grouped: they are not relevant for most users and can # distract them from finding the root cause of their errors header = [ - StructuredLogMessage.model_construct( - event="::group::Log message source details", sources=source_list - ), - StructuredLogMessage.model_construct(event="::endgroup::"), + StructuredLogMessage(event="::group::Log message source details", sources=source_list), # type: ignore[call-arg] + StructuredLogMessage(event="::endgroup::"), ] end_of_log = ti.try_number != try_number or ti.state not in ( TaskInstanceState.RUNNING, @@ -480,7 +478,7 @@ def read( try_number = task_instance.try_number if try_number is None or try_number < 1: logs = [ - StructuredLogMessage.model_construct( + StructuredLogMessage( # type: ignore[call-arg] level="error", event=f"Error fetching the logs. Try number {try_number} is invalid." ) ] diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index 63b1426c116c3..5bf3d2308c294 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -380,16 +380,15 @@ def concat_logs(hits: list[Hit]) -> str: from airflow.utils.log.file_task_handler import StructuredLogMessage header = [ - StructuredLogMessage.model_construct( + StructuredLogMessage( event="::group::Log message source details", sources=[host for host in logs_by_host.keys()], - ), - StructuredLogMessage.model_construct(event="::endgroup::"), + ), # type: ignore[call-arg] + StructuredLogMessage(event="::endgroup::"), ] # type: ignore[misc] message = header + [ - StructuredLogMessage.model_construct(event=concat_logs(hits)) - for hits in logs_by_host.values() + StructuredLogMessage(event=concat_logs(hits)) for hits in logs_by_host.values() ] # type: ignore[misc] else: message = [ diff --git a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py index 159f4f1f2bf14..58561acfb0628 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py +++ b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py @@ -409,16 +409,15 @@ def concat_logs(hits: list[Hit]): from airflow.utils.log.file_task_handler import StructuredLogMessage header = [ - StructuredLogMessage.model_construct( + StructuredLogMessage( event="::group::Log message source details", sources=[host for host in logs_by_host.keys()], - ), - StructuredLogMessage.model_construct(event="::endgroup::"), + ), # type: ignore[call-arg] + StructuredLogMessage(event="::endgroup::"), ] message = header + [ - StructuredLogMessage.model_construct(event=concat_logs(hits)) - for hits in logs_by_host.values() + StructuredLogMessage(event=concat_logs(hits)) for hits in logs_by_host.values() ] else: message = [(host, concat_logs(hits)) for host, hits in logs_by_host.items()] # type: ignore[misc] diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index ca888f4f113a1..3f9e3c67e5552 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -156,11 +156,11 @@ def test_test_test_read_log_stream_should_read_one_try(self): stream = task_log_reader.read_log_stream(ti=ti, try_number=1, metadata={}) assert list(stream) == [ - "{" + '{"timestamp":null,' '"event":"::group::Log message source details",' f'"sources":["{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log"]' "}\n", - '{"event":"::endgroup::"}\n', + '{"timestamp":null,"event":"::endgroup::"}\n', '{"timestamp":null,"event":"try_number=1."}\n', ] @@ -170,11 +170,11 @@ def test_test_test_read_log_stream_should_read_latest_logs(self): stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={}) assert list(stream) == [ - "{" + '{"timestamp":null,' '"event":"::group::Log message source details",' f'"sources":["{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log"]' "}\n", - '{"event":"::endgroup::"}\n', + '{"timestamp":null,"event":"::endgroup::"}\n', '{"timestamp":null,"event":"try_number=3."}\n', ] diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 92200faced365..f1245b863636b 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -481,7 +481,13 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs( logs, metadata = fth._read(ti=ti, try_number=1) if served_logs_checked: fth._read_from_logs_server.assert_called_once() - assert events(logs) == ["this", "log", "content"] + assert events(logs) == [ + "::group::Log message source details", + "::endgroup::", + "this", + "log", + "content", + ] assert metadata == {"end_of_log": True, "log_pos": 3} else: fth._read_from_logs_server.assert_not_called()