diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 6dc989b00d56d..37e80e9a28aab 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -6608,7 +6608,7 @@ paths: required: true schema: type: integer - exclusiveMinimum: 0 + minimum: 0 title: Try Number - name: full_content in: query diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py index 01cf859f05efd..277e705c18d2f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py @@ -22,7 +22,7 @@ from fastapi import Depends, HTTPException, Request, Response, status from itsdangerous import BadSignature, URLSafeSerializer -from pydantic import PositiveInt +from pydantic import NonNegativeInt, PositiveInt from sqlalchemy.orm import joinedload from sqlalchemy.sql import select @@ -75,7 +75,7 @@ def get_log( dag_id: str, dag_run_id: str, task_id: str, - try_number: PositiveInt, + try_number: NonNegativeInt, accept: HeaderAcceptJsonOrNdjson, request: Request, dag_bag: DagBagDep, diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx index c6259979950b6..a96f0c969c93a 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx @@ -91,7 +91,7 @@ export const Logs = () => { logLevelFilters, sourceFilters, taskInstance, - tryNumber: tryNumber === 0 ? 1 : tryNumber, + tryNumber, }); const externalLogName = useConfig("external_log_name") as string; diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 84987d1a20110..9b086457f604d 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -482,6 +482,15 @@ def read( """ if try_number is None: try_number = task_instance.try_number + + if task_instance.state == TaskInstanceState.SKIPPED: + logs = [ + StructuredLogMessage( # type: ignore[call-arg] + event="Task was skipped, no logs available." + ) + ] + return logs, {"end_of_log": True} + if try_number is None or try_number < 1: logs = [ StructuredLogMessage( # type: ignore[call-arg] diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index 5813c22819eec..286eae5c84255 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -151,6 +151,43 @@ def task_callable(ti): # Remove the generated tmp log file. os.remove(log_filename) + def test_file_task_handler_when_ti_is_skipped(self, dag_maker): + def task_callable(ti): + ti.log.info("test") + + with dag_maker("dag_for_testing_file_task_handler", schedule=None): + task = PythonOperator( + task_id="task_for_testing_file_log_handler", + python_callable=task_callable, + ) + dagrun = dag_maker.create_dagrun() + ti = TaskInstance(task=task, run_id=dagrun.run_id) + + ti.try_number = 0 + ti.state = State.SKIPPED + + logger = ti.log + ti.log.disabled = False + + file_handler = next( + (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None + ) + assert file_handler is not None + + set_context(logger, ti) + assert file_handler.handler is not None + # We expect set_context generates a file locally. + log_filename = file_handler.handler.baseFilename + assert os.path.isfile(log_filename) + assert log_filename.endswith("0.log"), log_filename + + # Return value of read must be a tuple of list and list. + logs, metadata = file_handler.read(ti) + assert logs[0].event == "Task was skipped, no logs available." + + # Remove the generated tmp log file. + os.remove(log_filename) + @pytest.mark.xfail(reason="TODO: Needs to be ported over to the new structlog based logging") def test_file_task_handler(self, dag_maker, session): def task_callable(ti):