diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 7478465911042..330861ec1d22e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -6323,7 +6323,7 @@ paths: required: true schema: type: integer - exclusiveMinimum: 0 + minimum: 0 title: Try Number - name: full_content in: query diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py index de7b327b6a4f1..1a7242c9936e3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py @@ -22,7 +22,7 @@ from fastapi import Depends, HTTPException, Request, Response, status from itsdangerous import BadSignature, URLSafeSerializer -from pydantic import PositiveInt +from pydantic import NonNegativeInt from sqlalchemy.orm import joinedload from sqlalchemy.sql import select @@ -75,7 +75,7 @@ def get_log( dag_id: str, dag_run_id: str, task_id: str, - try_number: PositiveInt, + try_number: NonNegativeInt, accept: HeaderAcceptJsonOrNdjson, request: Request, dag_bag: DagBagDep, diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx index 3b9864a6db3ad..5b84591a605c5 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx @@ -84,7 +84,7 @@ export const Logs = () => { logLevelFilters, sourceFilters, taskInstance, - tryNumber: tryNumber === 0 ? 1 : tryNumber, + tryNumber, }); return ( diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index db18e47f13f93..89bdf385ee80f 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -482,6 +482,15 @@ def read( """ if try_number is None: try_number = task_instance.try_number + + if task_instance.state == TaskInstanceState.SKIPPED: + logs = [ + StructuredLogMessage( # type: ignore[call-arg] + event="Task was skipped, no logs available." + ) + ] + return logs, {"end_of_log": True} + if try_number is None or try_number < 1: logs = [ StructuredLogMessage( # type: ignore[call-arg] diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index 5813c22819eec..286eae5c84255 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -151,6 +151,43 @@ def task_callable(ti): # Remove the generated tmp log file. os.remove(log_filename) + def test_file_task_handler_when_ti_is_skipped(self, dag_maker): + def task_callable(ti): + ti.log.info("test") + + with dag_maker("dag_for_testing_file_task_handler", schedule=None): + task = PythonOperator( + task_id="task_for_testing_file_log_handler", + python_callable=task_callable, + ) + dagrun = dag_maker.create_dagrun() + ti = TaskInstance(task=task, run_id=dagrun.run_id) + + ti.try_number = 0 + ti.state = State.SKIPPED + + logger = ti.log + ti.log.disabled = False + + file_handler = next( + (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None + ) + assert file_handler is not None + + set_context(logger, ti) + assert file_handler.handler is not None + # We expect set_context generates a file locally. + log_filename = file_handler.handler.baseFilename + assert os.path.isfile(log_filename) + assert log_filename.endswith("0.log"), log_filename + + # Return value of read must be a tuple of list and list. + logs, metadata = file_handler.read(ti) + assert logs[0].event == "Task was skipped, no logs available." + + # Remove the generated tmp log file. + os.remove(log_filename) + @pytest.mark.xfail(reason="TODO: Needs to be ported over to the new structlog based logging") def test_file_task_handler(self, dag_maker, session): def task_callable(ti):