Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions airflow-core/src/airflow/api_fastapi/common/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,30 @@ def header_accept_json_or_text_depends(


HeaderAcceptJsonOrText = Annotated[Mimetype, Depends(header_accept_json_or_text_depends)]


def header_accept_json_or_ndjson_depends(
accept: Annotated[
str,
Header(
json_schema_extra={
"type": "string",
"enum": [Mimetype.JSON, Mimetype.NDJSON, Mimetype.ANY],
}
),
] = Mimetype.ANY,
) -> Mimetype:
if accept.startswith(Mimetype.ANY):
return Mimetype.ANY
if accept.startswith(Mimetype.JSON):
return Mimetype.JSON
if accept.startswith(Mimetype.NDJSON) or accept.startswith(Mimetype.ANY):
return Mimetype.NDJSON

raise HTTPException(
status_code=status.HTTP_406_NOT_ACCEPTABLE,
detail="Only application/json or application/x-ndjson is supported",
)


HeaderAcceptJsonOrNdjson = Annotated[Mimetype, Depends(header_accept_json_or_ndjson_depends)]
1 change: 1 addition & 0 deletions airflow-core/src/airflow/api_fastapi/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class Mimetype(str, Enum):

TEXT = "text/plain"
JSON = "application/json"
NDJSON = "application/x-ndjson"
ANY = "*/*"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6354,7 +6354,7 @@ paths:
type: string
enum:
- application/json
- text/plain
- application/x-ndjson
- '*/*'
default: '*/*'
title: Accept
Expand All @@ -6365,10 +6365,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/TaskInstancesLogResponse'
text/plain:
application/x-ndjson:
schema:
type: string
example: 'content
example: '{"content": "content"}

{"content": "content"}

'
'401':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from airflow.api_fastapi.common.dagbag import DagBagDep
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText
from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrNdjson
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.common.types import Mimetype
from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse
Expand All @@ -43,13 +43,14 @@
tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances"
)

text_example_response_for_get_log = {
Mimetype.TEXT: {
ndjson_example_response_for_get_log = {
Mimetype.NDJSON: {
"schema": {
"type": "string",
"example": textwrap.dedent(
"""\
content
{"content": "content"}
{"content": "content"}
"""
),
}
Expand All @@ -63,7 +64,7 @@
**create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
status.HTTP_200_OK: {
"description": "Successful Response",
"content": text_example_response_for_get_log,
"content": ndjson_example_response_for_get_log,
},
},
dependencies=[Depends(requires_access_dag("GET", DagAccessEntity.TASK_LOGS))],
Expand All @@ -75,7 +76,7 @@ def get_log(
dag_run_id: str,
task_id: str,
try_number: PositiveInt,
accept: HeaderAcceptJsonOrText,
accept: HeaderAcceptJsonOrNdjson,
request: Request,
dag_bag: DagBagDep,
session: SessionDep,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,7 @@ export const UseTaskInstanceServiceGetLogKeyFn = (
token,
tryNumber,
}: {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "*/*" | "application/x-ndjson";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,7 @@ export const ensureUseTaskInstanceServiceGetLogData = (
token,
tryNumber,
}: {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "*/*" | "application/x-ndjson";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,7 @@ export const prefetchUseTaskInstanceServiceGetLog = (
token,
tryNumber,
}: {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "*/*" | "application/x-ndjson";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2081,7 +2081,7 @@ export const useTaskInstanceServiceGetLog = <
token,
tryNumber,
}: {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "*/*" | "application/x-ndjson";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2058,7 +2058,7 @@ export const useTaskInstanceServiceGetLogSuspense = <
token,
tryNumber,
}: {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "*/*" | "application/x-ndjson";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2386,7 +2386,7 @@ export type PatchTaskInstanceDryRunData = {
export type PatchTaskInstanceDryRunResponse = TaskInstanceCollectionResponse;

export type GetLogData = {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "application/x-ndjson" | "*/*";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"@emotion/react": "^11.14.0",
"@tanstack/react-query": "^5.75.1",
"@tanstack/react-table": "^8.21.3",
"@tanstack/react-virtual": "^3.13.8",
"@types/debounce-promise": "^3.1.9",
"@uiw/codemirror-themes-all": "^4.23.12",
"@uiw/react-codemirror": "^4.23.12",
Expand Down
20 changes: 20 additions & 0 deletions airflow-core/src/airflow/ui/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export const TaskLogPreview = ({
error={error}
isLoading={isLoading}
logError={error}
parsedLogs={data.parsedLogs}
parsedLogs={data.parsedLogs ?? []}
wrap={wrap}
/>
</Box>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@
* under the License.
*/
import "@testing-library/jest-dom";
import { render, screen, waitFor } from "@testing-library/react";
import { fireEvent, render, screen, waitFor } from "@testing-library/react";
import { setupServer, type SetupServerApi } from "msw/node";
import { afterEach, describe, it, expect, beforeAll, afterAll } from "vitest";

import { handlers } from "src/mocks/handlers";
import { AppWrapper } from "src/utils/AppWrapper";

let server: SetupServerApi;
const ITEM_HEIGHT = 20;

beforeAll(() => {
server = setupServer(...handlers);
server.listen({ onUnhandledRequest: "bypass" });
Object.defineProperty(HTMLElement.prototype, "offsetHeight", {
value: ITEM_HEIGHT,
});
Object.defineProperty(HTMLElement.prototype, "offsetWidth", {
value: 800,
});
});

afterEach(() => server.resetHandlers());
Expand All @@ -39,14 +46,18 @@ describe("Task log grouping", () => {
render(
<AppWrapper initialEntries={["/dags/log_grouping/runs/manual__2025-02-18T12:19/tasks/generate"]} />,
);
await waitFor(() => expect(screen.queryByTestId("virtualized-list")).toBeInTheDocument());
await waitFor(() => expect(screen.queryByTestId("virtualized-item-0")).toBeInTheDocument());
await waitFor(() => expect(screen.queryByTestId("virtualized-item-10")).toBeInTheDocument());

await waitFor(() => expect(screen.queryByTestId("summary-Pre task execution logs")).toBeInTheDocument(), {
timeout: 10_000,
});
fireEvent.scroll(screen.getByTestId("virtualized-list"), { target: { scrollTop: ITEM_HEIGHT * 6 } });
await waitFor(() => expect(screen.queryByTestId("virtualized-item-16")).toBeInTheDocument());

await waitFor(() => expect(screen.queryByTestId("summary-Pre task execution logs")).toBeInTheDocument());
await waitFor(() => expect(screen.getByTestId("summary-Pre task execution logs")).toBeVisible());
await waitFor(() => expect(screen.queryByText(/Task instance is in running state/iu)).not.toBeVisible());

await waitFor(() => screen.getByTestId("summary-Pre task execution logs").click());
await waitFor(() => expect(screen.queryByText(/Task instance is in running state/iu)).toBeVisible());
});
}, 10_000);
});
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export const Logs = () => {
});

return (
<Box p={2}>
<Box display="flex" flexDirection="column" h="100%" p={2}>
<TaskLogHeader
onSelectTryNumber={onSelectTryNumber}
sourceOptions={data.sources}
Expand All @@ -102,7 +102,7 @@ export const Logs = () => {
error={error}
isLoading={isLoading || isLoadingLogs}
logError={logError}
parsedLogs={data.parsedLogs}
parsedLogs={data.parsedLogs ?? []}
wrap={wrap}
/>
<Dialog.Root onOpenChange={onOpenChange} open={fullscreen} scrollBehavior="inside" size="full">
Expand All @@ -124,12 +124,12 @@ export const Logs = () => {

<Dialog.CloseTrigger />

<Dialog.Body>
<Dialog.Body display="flex" flexDirection="column">
<TaskLogContent
error={error}
isLoading={isLoading || isLoadingLogs}
logError={logError}
parsedLogs={data.parsedLogs}
parsedLogs={data.parsedLogs ?? []}
wrap={wrap}
/>
</Dialog.Body>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
* under the License.
*/
import { Box, Code, VStack, useToken } from "@chakra-ui/react";
import type { ReactNode } from "react";
import { useLayoutEffect } from "react";
import { useVirtualizer } from "@tanstack/react-virtual";
import { useLayoutEffect, useRef } from "react";

import { ErrorAlert } from "src/components/ErrorAlert";
import { ProgressBar } from "src/components/ui";
Expand All @@ -27,12 +27,19 @@ type Props = {
readonly error: unknown;
readonly isLoading: boolean;
readonly logError: unknown;
readonly parsedLogs: ReactNode;
readonly parsedLogs: Array<JSX.Element | string | undefined>;
readonly wrap: boolean;
};

export const TaskLogContent = ({ error, isLoading, logError, parsedLogs, wrap }: Props) => {
const [bgLine] = useToken("colors", ["blue.emphasized"]);
const parentRef = useRef(null);
const rowVirtualizer = useVirtualizer({
count: parsedLogs.length,
estimateSize: () => 20,
getScrollElement: () => parentRef.current,
overscan: 10,
});

useLayoutEffect(() => {
if (location.hash) {
Expand All @@ -53,7 +60,7 @@ export const TaskLogContent = ({ error, isLoading, logError, parsedLogs, wrap }:
}, [isLoading, bgLine]);

return (
<Box>
<Box display="flex" flexDirection="column" flexGrow={1} h="100%" minHeight={0}>
<ErrorAlert error={error ?? logError} />
<ProgressBar size="xs" visibility={isLoading ? "visible" : "hidden"} />
<Code
Expand All @@ -62,13 +69,32 @@ export const TaskLogContent = ({ error, isLoading, logError, parsedLogs, wrap }:
bg: "blue.subtle",
},
}}
data-testid="virtualized-list"
flexGrow={1}
h="auto"
overflow="auto"
position="relative"
py={3}
ref={parentRef}
textWrap={wrap ? "pre" : "nowrap"}
width="100%"
>
<VStack alignItems="flex-start" gap={0}>
{parsedLogs}
<VStack alignItems="flex-start" gap={0} h={`${rowVirtualizer.getTotalSize()}px`}>
{rowVirtualizer.getVirtualItems().map((virtualRow) => (
<Box
data-index={virtualRow.index}
data-testid={`virtualized-item-${virtualRow.index}`}
key={virtualRow.key}
left={0}
position="absolute"
ref={rowVirtualizer.measureElement}
top={0}
transform={`translateY(${virtualRow.start}px)`}
width={wrap ? "100%" : "max-content"}
>
{parsedLogs[virtualRow.index] ?? undefined}
</Box>
))}
</VStack>
</Code>
</Box>
Expand Down
Loading