Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84: Migrate get_log endpoint #44238

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,11 @@ class TaskInstanceHistoryCollectionResponse(BaseModel):

task_instances: list[TaskInstanceHistoryResponse]
total_entries: int


# Response Models
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved
class TaskInstancesLogResponseObject(BaseModel):
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved
"""Log serializer for responses."""

content: str
continuation_token: str | None
84 changes: 84 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4181,6 +4181,90 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}:
get:
tags:
- Task Instance
summary: Get Log
description: Get logs for specific task instance.
operationId: get_log
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: task_try_number
in: path
required: true
schema:
type: integer
title: Task Try Number
- name: full_content
in: query
required: false
schema:
type: boolean
default: false
title: Full Content
- name: map_index
in: query
required: false
schema:
type: integer
default: -1
title: Map Index
- name: token
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Token
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/tasks:
get:
tags:
Expand Down
100 changes: 96 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/task_instances.py
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

from __future__ import annotations

from typing import Annotated, Literal
from typing import Annotated, Any, Literal

from fastapi import Depends, HTTPException, Request, status
# from flask import Response
from fastapi import Depends, HTTPException, Request, Response, status
from itsdangerous import BadSignature, URLSafeSerializer
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.sql import select

Expand Down Expand Up @@ -53,15 +55,17 @@
TaskInstanceHistoryResponse,
TaskInstanceResponse,
TaskInstancesBatchBody,
TaskInstancesLogResponseObject,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.exceptions import TaskNotFound
from airflow.models import Base, DagRun
from airflow.models import Base, DagRun, TaskInstance, Trigger
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.models.taskinstancehistory import TaskInstanceHistory, TaskInstanceHistory as TIH
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.utils.db import get_query_count
from airflow.utils.log.log_reader import TaskLogReader
from airflow.utils.state import TaskInstanceState

task_instances_router = AirflowRouter(
Expand Down Expand Up @@ -482,3 +486,91 @@ def get_mapped_task_instance_try_details(
map_index=map_index,
session=session,
)


@task_instances_router.get(
"/{task_id}/logs/{task_try_number}",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
response_model=None,
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved
)
def get_log(
*,
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved
dag_id: str,
dag_run_id: str,
task_id: str,
task_try_number: int,
request: Request,
session: Annotated[Session, Depends(get_session)],
full_content: bool = False,
map_index: int = -1,
token: str | None = None,
) -> Response | dict:
"""Get logs for specific task instance."""
if not token:
metadata = {}
else:
try:
metadata = URLSafeSerializer(request.app.state.secret_key).loads(token)
except BadSignature:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API."
)

if metadata.get("download_logs") and metadata["download_logs"]:
full_content = True

if full_content:
metadata["download_logs"] = True
else:
metadata["download_logs"] = False

task_log_reader = TaskLogReader()

if not task_log_reader.supports_read:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.")

query = (
select(TaskInstance)
.where(
TaskInstance.task_id == task_id,
TaskInstance.dag_id == dag_id,
TaskInstance.run_id == dag_run_id,
TaskInstance.map_index == map_index,
)
.join(TaskInstance.dag_run)
.options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job))
)
ti = session.scalar(query)
if ti is None:
query = select(TaskInstanceHistory).where(
TaskInstanceHistory.task_id == task_id,
TaskInstanceHistory.dag_id == dag_id,
TaskInstanceHistory.run_id == dag_run_id,
TaskInstanceHistory.map_index == map_index,
TaskInstanceHistory.try_number == task_try_number,
)
ti = session.scalar(query)

if ti is None:
metadata["end_of_log"] = True
raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not found")

dag = request.app.state.dag_bag.get_dag(dag_id)
if dag:
try:
ti.task = dag.get_task(ti.task_id)
except TaskNotFound:
pass

return_type = request.headers["accept"]
# return_type would be either the above two or None
logs: Any
if return_type == "application/json" or return_type is None: # default
logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
logs = logs[0] if task_try_number is not None else logs
# we must have token here, so we can safely ignore it
token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment]
return TaskInstancesLogResponseObject(continuation_token=token, content=str(logs)).model_dump()
# text/plain. Stream
logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
return Response(media_type="text/plain", content="".join(list(logs)))
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved
33 changes: 33 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,39 @@ export const UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn = (
useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]),
];
export type TaskInstanceServiceGetLogDefaultResponse = Awaited<
ReturnType<typeof TaskInstanceService.getLog>
>;
export type TaskInstanceServiceGetLogQueryResult<
TData = TaskInstanceServiceGetLogDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetLogKey = "TaskInstanceServiceGetLog";
export const UseTaskInstanceServiceGetLogKeyFn = (
{
dagId,
dagRunId,
fullContent,
mapIndex,
taskId,
taskTryNumber,
token,
}: {
dagId: string;
dagRunId: string;
fullContent?: boolean;
mapIndex?: number;
taskId: string;
taskTryNumber: number;
token?: string;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetLogKey,
...(queryKey ?? [
{ dagId, dagRunId, fullContent, mapIndex, taskId, taskTryNumber, token },
]),
];
export type TaskServiceGetTasksDefaultResponse = Awaited<
ReturnType<typeof TaskService.getTasks>
>;
Expand Down
55 changes: 55 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,61 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstanceTryDetails = (
taskTryNumber,
}),
});
/**
* Get Log
* Get logs for specific task instance.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.taskTryNumber
* @param data.fullContent
* @param data.mapIndex
* @param data.token
* @returns unknown Successful Response
* @throws ApiError
*/
export const prefetchUseTaskInstanceServiceGetLog = (
queryClient: QueryClient,
{
dagId,
dagRunId,
fullContent,
mapIndex,
taskId,
taskTryNumber,
token,
}: {
dagId: string;
dagRunId: string;
fullContent?: boolean;
mapIndex?: number;
taskId: string;
taskTryNumber: number;
token?: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseTaskInstanceServiceGetLogKeyFn({
dagId,
dagRunId,
fullContent,
mapIndex,
taskId,
taskTryNumber,
token,
}),
queryFn: () =>
TaskInstanceService.getLog({
dagId,
dagRunId,
fullContent,
mapIndex,
taskId,
taskTryNumber,
token,
}),
});
/**
* Get Tasks
* Get tasks for DAG.
Expand Down
56 changes: 56 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1905,6 +1905,62 @@ export const useTaskInstanceServiceGetMappedTaskInstanceTryDetails = <
}) as TData,
...options,
});
/**
* Get Log
* Get logs for specific task instance.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.taskTryNumber
* @param data.fullContent
* @param data.mapIndex
* @param data.token
* @returns unknown Successful Response
* @throws ApiError
*/
export const useTaskInstanceServiceGetLog = <
TData = Common.TaskInstanceServiceGetLogDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
dagRunId,
fullContent,
mapIndex,
taskId,
taskTryNumber,
token,
}: {
dagId: string;
dagRunId: string;
fullContent?: boolean;
mapIndex?: number;
taskId: string;
taskTryNumber: number;
token?: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseTaskInstanceServiceGetLogKeyFn(
{ dagId, dagRunId, fullContent, mapIndex, taskId, taskTryNumber, token },
queryKey,
),
queryFn: () =>
TaskInstanceService.getLog({
dagId,
dagRunId,
fullContent,
mapIndex,
taskId,
taskTryNumber,
token,
}) as TData,
...options,
});
/**
* Get Tasks
* Get tasks for DAG.
Expand Down
Loading