Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 0 additions & 57 deletions airflow-core/src/airflow/api_fastapi/common/db/task_instance.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@

from airflow.api_fastapi.common.dagbag import DagBagDep
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.db.task_instance import get_task_instance_or_history_for_try_number
from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrNdjson
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.common.types import Mimetype
from airflow.api_fastapi.core_api.datamodels.log import ExternalLogUrlResponse, TaskInstancesLogResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import DagAccessEntity, requires_access_dag
from airflow.exceptions import TaskNotFound
from airflow.models import TaskInstance
from airflow.models import TaskInstance, Trigger
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.utils.log.log_reader import TaskLogReader

task_instances_log_router = AirflowRouter(
Expand Down Expand Up @@ -105,14 +105,28 @@ def get_log(
if not task_log_reader.supports_read:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.")

ti = get_task_instance_or_history_for_try_number(
dag_id=dag_id,
dag_run_id=dag_run_id,
task_id=task_id,
try_number=try_number,
session=session,
map_index=map_index,
query = (
select(TaskInstance)
.where(
TaskInstance.task_id == task_id,
TaskInstance.dag_id == dag_id,
TaskInstance.run_id == dag_run_id,
TaskInstance.map_index == map_index,
)
.join(TaskInstance.dag_run)
.options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job))
.options(joinedload(TaskInstance.dag_model))
)
ti = session.scalar(query)
if ti is None:
query = select(TaskInstanceHistory).where(
TaskInstanceHistory.task_id == task_id,
TaskInstanceHistory.dag_id == dag_id,
TaskInstanceHistory.run_id == dag_run_id,
TaskInstanceHistory.map_index == map_index,
TaskInstanceHistory.try_number == try_number,
)
ti = session.scalar(query)

if ti is None:
metadata["end_of_log"] = True
Expand Down
4 changes: 1 addition & 3 deletions airflow-core/src/airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,7 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[LogSourceInfo
sources = []
logs = []
try:
log_type = (
LogType.TRIGGER if hasattr(ti, "triggerer_job") and ti.triggerer_job else LogType.WORKER
)
log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER
url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type)
response = _fetch_logs_from_service(url, rel_path)
if response.status_code == 403:
Expand Down
16 changes: 0 additions & 16 deletions airflow-core/tests/unit/api_fastapi/common/db/__init__.py

This file was deleted.

This file was deleted.

Loading