Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84: Migrate get_log endpoint #44238

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from pydantic import BaseModel


class TaskInstancesLogResponse(BaseModel):
"""Log serializer for responses."""

content: str
continuation_token: str | None
112 changes: 112 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4633,6 +4633,102 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}:
get:
tags:
- Task Instance
summary: Get Log
description: Get logs for a specific task instance.
operationId: get_log
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: task_try_number
in: path
required: true
schema:
type: integer
title: Task Try Number
- name: full_content
in: query
required: false
schema:
type: boolean
default: false
title: Full Content
- name: map_index
in: query
required: false
schema:
type: integer
default: -1
title: Map Index
- name: token
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Token
- name: accept
in: header
required: false
schema:
type: string
enum:
- application/json
- text/plain
- '*/*'
default: '*/*'
title: Accept
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskInstancesLogResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
Expand Down Expand Up @@ -7237,6 +7333,22 @@ components:
type: object
title: TaskInstancesBatchBody
description: Task Instance body for get batch.
TaskInstancesLogResponse:
properties:
content:
type: string
title: Content
continuation_token:
anyOf:
- type: string
- type: 'null'
title: Continuation Token
type: object
required:
- content
- continuation_token
title: TaskInstancesLogResponse
description: Log serializer for responses.
TaskOutletAssetReference:
properties:
dag_id:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router
from airflow.api_fastapi.core_api.routes.public.import_error import import_error_router
from airflow.api_fastapi.core_api.routes.public.log import task_instances_log_router
from airflow.api_fastapi.core_api.routes.public.monitor import monitor_router
from airflow.api_fastapi.core_api.routes.public.plugins import plugins_router
from airflow.api_fastapi.core_api.routes.public.pools import pools_router
Expand Down Expand Up @@ -67,6 +68,7 @@
authenticated_router.include_router(tasks_router)
authenticated_router.include_router(variables_router)
authenticated_router.include_router(xcom_router)
authenticated_router.include_router(task_instances_log_router)

# Include authenticated router in public router
public_router.include_router(authenticated_router)
Expand Down
128 changes: 128 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import Annotated, Any

from fastapi import Depends, HTTPException, Request, Response, status
from itsdangerous import BadSignature, URLSafeSerializer
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.sql import select

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from pydantic import PositiveInt

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.common.types import Mimetype
from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.exceptions import TaskNotFound
from airflow.models import TaskInstance, Trigger
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.utils.log.log_reader import TaskLogReader

task_instances_log_router = AirflowRouter(
tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances"
)


@task_instances_log_router.get(
"/{task_id}/logs/{task_try_number}",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are missing the documentation for text/plain mimetype.

response_model=TaskInstancesLogResponse,
)
def get_log(
dag_id: str,
dag_run_id: str,
task_id: str,
task_try_number: int,
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

@rawwar rawwar Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
task_try_number: int,
task_try_number: PositiveInt,

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can import this from pydantic
from pydantic import NonNegativeInt

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will automatically raise 422. We don't need to separately check if its less than 0

Copy link
Contributor

@bbovenzi bbovenzi Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try_number: int = 1

Let's add a default try number value. and just try_number is sufficient. There is no need for the task_ prefix

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that try_number can be >0. In that case, we should use PositiveInt

accept: HeaderAcceptJsonOrText,
request: Request,
session: Annotated[Session, Depends(get_session)],
full_content: bool = False,
map_index: int = -1,
token: str | None = None,
):
"""Get logs for a specific task instance."""
if not token:
metadata = {}
else:
try:
metadata = URLSafeSerializer(request.app.state.secret_key).loads(token)
except BadSignature:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API."
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
)

if task_try_number <= 0:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "task_try_number must be a positive integer")
Comment on lines +71 to +72
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if task_try_number <= 0:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "task_try_number must be a positive integer")


if metadata.get("download_logs") and metadata["download_logs"]:
full_content = True

if full_content:
metadata["download_logs"] = True
else:
metadata["download_logs"] = False
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved

task_log_reader = TaskLogReader()

if not task_log_reader.supports_read:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.")

query = (
select(TaskInstance)
.where(
TaskInstance.task_id == task_id,
TaskInstance.dag_id == dag_id,
TaskInstance.run_id == dag_run_id,
TaskInstance.map_index == map_index,
)
.join(TaskInstance.dag_run)
.options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job))
)
ti = session.scalar(query)
if ti is None:
query = select(TaskInstanceHistory).where(
TaskInstanceHistory.task_id == task_id,
TaskInstanceHistory.dag_id == dag_id,
TaskInstanceHistory.run_id == dag_run_id,
TaskInstanceHistory.map_index == map_index,
TaskInstanceHistory.try_number == task_try_number,
)
ti = session.scalar(query)

if ti is None:
metadata["end_of_log"] = True
raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not found")

dag = request.app.state.dag_bag.get_dag(dag_id)
if dag:
try:
ti.task = dag.get_task(ti.task_id)
except TaskNotFound:
pass

logs: Any
if accept == Mimetype.JSON or accept == Mimetype.ANY: # default
logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
# we must have token here, so we can safely ignore it
token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment]
return TaskInstancesLogResponse(continuation_token=token, content=str(logs[0])).model_dump()
# text/plain. Stream
logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
return Response(media_type=accept, content="".join(list(logs)))
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,50 @@ export const UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn = (
useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]),
];
export type TaskInstanceServiceGetLogDefaultResponse = Awaited<
ReturnType<typeof TaskInstanceService.getLog>
>;
export type TaskInstanceServiceGetLogQueryResult<
TData = TaskInstanceServiceGetLogDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetLogKey = "TaskInstanceServiceGetLog";
export const UseTaskInstanceServiceGetLogKeyFn = (
{
accept,
dagId,
dagRunId,
fullContent,
mapIndex,
taskId,
taskTryNumber,
token,
}: {
accept?: "application/json" | "text/plain" | "*/*";
dagId: string;
dagRunId: string;
fullContent?: boolean;
mapIndex?: number;
taskId: string;
taskTryNumber: number;
token?: string;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetLogKey,
...(queryKey ?? [
{
accept,
dagId,
dagRunId,
fullContent,
mapIndex,
taskId,
taskTryNumber,
token,
},
]),
];
export type TaskServiceGetTasksDefaultResponse = Awaited<
ReturnType<typeof TaskService.getTasks>
>;
Expand Down
Loading