diff --git a/airflow-core/src/airflow/api_fastapi/common/headers.py b/airflow-core/src/airflow/api_fastapi/common/headers.py index 7d1a0fa69613b..13567e32bdc7d 100644 --- a/airflow-core/src/airflow/api_fastapi/common/headers.py +++ b/airflow-core/src/airflow/api_fastapi/common/headers.py @@ -47,3 +47,30 @@ def header_accept_json_or_text_depends( HeaderAcceptJsonOrText = Annotated[Mimetype, Depends(header_accept_json_or_text_depends)] + + +def header_accept_json_or_ndjson_depends( + accept: Annotated[ + str, + Header( + json_schema_extra={ + "type": "string", + "enum": [Mimetype.JSON, Mimetype.NDJSON, Mimetype.ANY], + } + ), + ] = Mimetype.ANY, +) -> Mimetype: + if accept.startswith(Mimetype.ANY): + return Mimetype.ANY + if accept.startswith(Mimetype.JSON): + return Mimetype.JSON + if accept.startswith(Mimetype.NDJSON) or accept.startswith(Mimetype.ANY): + return Mimetype.NDJSON + + raise HTTPException( + status_code=status.HTTP_406_NOT_ACCEPTABLE, + detail="Only application/json or application/x-ndjson is supported", + ) + + +HeaderAcceptJsonOrNdjson = Annotated[Mimetype, Depends(header_accept_json_or_ndjson_depends)] diff --git a/airflow-core/src/airflow/api_fastapi/common/types.py b/airflow-core/src/airflow/api_fastapi/common/types.py index 0b431dfdef466..18e5dc7387d62 100644 --- a/airflow-core/src/airflow/api_fastapi/common/types.py +++ b/airflow-core/src/airflow/api_fastapi/common/types.py @@ -72,6 +72,7 @@ class Mimetype(str, Enum): TEXT = "text/plain" JSON = "application/json" + NDJSON = "application/x-ndjson" ANY = "*/*" diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml index ff6725b266b55..7478465911042 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml @@ -6354,7 +6354,7 @@ paths: type: string enum: - application/json - - text/plain + - application/x-ndjson - '*/*' default: '*/*' title: Accept @@ -6365,10 +6365,12 @@ paths: application/json: schema: $ref: '#/components/schemas/TaskInstancesLogResponse' - text/plain: + application/x-ndjson: schema: type: string - example: 'content + example: '{"content": "content"} + + {"content": "content"} ' '401': diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py index 3873cadf69d76..05313e2b69b5a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py @@ -28,7 +28,7 @@ from airflow.api_fastapi.common.dagbag import DagBagDep from airflow.api_fastapi.common.db.common import SessionDep -from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrNdjson from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.common.types import Mimetype from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse @@ -43,13 +43,14 @@ tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" ) -text_example_response_for_get_log = { - Mimetype.TEXT: { +ndjson_example_response_for_get_log = { + Mimetype.NDJSON: { "schema": { "type": "string", "example": textwrap.dedent( """\ - content + {"content": "content"} + {"content": "content"} """ ), } @@ -63,7 +64,7 @@ **create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), status.HTTP_200_OK: { "description": "Successful Response", - "content": text_example_response_for_get_log, + "content": ndjson_example_response_for_get_log, }, }, dependencies=[Depends(requires_access_dag("GET", DagAccessEntity.TASK_LOGS))], @@ -75,7 +76,7 @@ def get_log( dag_run_id: str, task_id: str, try_number: PositiveInt, - accept: HeaderAcceptJsonOrText, + accept: HeaderAcceptJsonOrNdjson, request: Request, dag_bag: DagBagDep, session: SessionDep, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 5af1938bea09a..a7efa4868c859 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -1267,7 +1267,7 @@ export const UseTaskInstanceServiceGetLogKeyFn = ( token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 1ec386a67f906..f02690c160ba0 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1744,7 +1744,7 @@ export const ensureUseTaskInstanceServiceGetLogData = ( token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index c67b0e525cb18..b9039a10f3719 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1744,7 +1744,7 @@ export const prefetchUseTaskInstanceServiceGetLog = ( token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 620dab69dc6d0..30b49d52aa330 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -2081,7 +2081,7 @@ export const useTaskInstanceServiceGetLog = < token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 767004d466379..d525b0a662c39 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -2058,7 +2058,7 @@ export const useTaskInstanceServiceGetLogSuspense = < token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 18322378fd5f4..b04ce36ef78d4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2386,7 +2386,7 @@ export type PatchTaskInstanceDryRunData = { export type PatchTaskInstanceDryRunResponse = TaskInstanceCollectionResponse; export type GetLogData = { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "application/x-ndjson" | "*/*"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/package.json b/airflow-core/src/airflow/ui/package.json index d1a7b9773a2f9..83d4eb143b5a2 100644 --- a/airflow-core/src/airflow/ui/package.json +++ b/airflow-core/src/airflow/ui/package.json @@ -22,6 +22,7 @@ "@emotion/react": "^11.14.0", "@tanstack/react-query": "^5.75.1", "@tanstack/react-table": "^8.21.3", + "@tanstack/react-virtual": "^3.13.8", "@types/debounce-promise": "^3.1.9", "@uiw/codemirror-themes-all": "^4.23.12", "@uiw/react-codemirror": "^4.23.12", diff --git a/airflow-core/src/airflow/ui/pnpm-lock.yaml b/airflow-core/src/airflow/ui/pnpm-lock.yaml index 0bbcce382b0de..5909fe6a14381 100644 --- a/airflow-core/src/airflow/ui/pnpm-lock.yaml +++ b/airflow-core/src/airflow/ui/pnpm-lock.yaml @@ -26,6 +26,9 @@ importers: '@tanstack/react-table': specifier: ^8.21.3 version: 8.21.3(react-dom@18.3.1(react@18.3.1))(react@18.3.1) + '@tanstack/react-virtual': + specifier: ^3.13.8 + version: 3.13.8(react-dom@18.3.1(react@18.3.1))(react@18.3.1) '@types/debounce-promise': specifier: ^3.1.9 version: 3.1.9 @@ -986,10 +989,19 @@ packages: react: '>=16.8' react-dom: '>=16.8' + '@tanstack/react-virtual@3.13.8': + resolution: {integrity: sha512-meS2AanUg50f3FBSNoAdBSRAh8uS0ue01qm7zrw65KGJtiXB9QXfybqZwkh4uFpRv2iX/eu5tjcH5wqUpwYLPg==} + peerDependencies: + react: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 + react-dom: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 + '@tanstack/table-core@8.21.3': resolution: {integrity: sha512-ldZXEhOBb8Is7xLs01fR3YEc3DERiz5silj8tnGkFZytt1abEvl/GhUmCE0PMLaMPTa3Jk4HbKmRlHmu+gCftg==} engines: {node: '>=12'} + '@tanstack/virtual-core@3.13.8': + resolution: {integrity: sha512-BT6w89Hqy7YKaWewYzmecXQzcJh6HTBbKYJIIkMaNU49DZ06LoTV3z32DWWEdUsgW6n1xTmwTLs4GtWrZC261w==} + '@testing-library/dom@10.4.0': resolution: {integrity: sha512-pemlzrSESWbdAloYml3bAJMEfNh1Z7EduzqPKprCH5S341frlpYnUEW0H72dLxa6IsYr+mPno20GiSm+h9dEdQ==} engines: {node: '>=18'} @@ -5236,8 +5248,16 @@ snapshots: react: 18.3.1 react-dom: 18.3.1(react@18.3.1) + '@tanstack/react-virtual@3.13.8(react-dom@18.3.1(react@18.3.1))(react@18.3.1)': + dependencies: + '@tanstack/virtual-core': 3.13.8 + react: 18.3.1 + react-dom: 18.3.1(react@18.3.1) + '@tanstack/table-core@8.21.3': {} + '@tanstack/virtual-core@3.13.8': {} + '@testing-library/dom@10.4.0': dependencies: '@babel/code-frame': 7.27.1 diff --git a/airflow-core/src/airflow/ui/src/pages/Dag/Overview/TaskLogPreview.tsx b/airflow-core/src/airflow/ui/src/pages/Dag/Overview/TaskLogPreview.tsx index 979f6bcfddff7..a4b82e1680369 100644 --- a/airflow-core/src/airflow/ui/src/pages/Dag/Overview/TaskLogPreview.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Dag/Overview/TaskLogPreview.tsx @@ -67,7 +67,7 @@ export const TaskLogPreview = ({ error={error} isLoading={isLoading} logError={error} - parsedLogs={data.parsedLogs} + parsedLogs={data.parsedLogs ?? []} wrap={wrap} /> diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.test.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.test.tsx index e39b98cbaf99a..af613d9dc61d3 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.test.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.test.tsx @@ -17,7 +17,7 @@ * under the License. */ import "@testing-library/jest-dom"; -import { render, screen, waitFor } from "@testing-library/react"; +import { fireEvent, render, screen, waitFor } from "@testing-library/react"; import { setupServer, type SetupServerApi } from "msw/node"; import { afterEach, describe, it, expect, beforeAll, afterAll } from "vitest"; @@ -25,10 +25,17 @@ import { handlers } from "src/mocks/handlers"; import { AppWrapper } from "src/utils/AppWrapper"; let server: SetupServerApi; +const ITEM_HEIGHT = 20; beforeAll(() => { server = setupServer(...handlers); server.listen({ onUnhandledRequest: "bypass" }); + Object.defineProperty(HTMLElement.prototype, "offsetHeight", { + value: ITEM_HEIGHT, + }); + Object.defineProperty(HTMLElement.prototype, "offsetWidth", { + value: 800, + }); }); afterEach(() => server.resetHandlers()); @@ -39,14 +46,18 @@ describe("Task log grouping", () => { render( , ); + await waitFor(() => expect(screen.queryByTestId("virtualized-list")).toBeInTheDocument()); + await waitFor(() => expect(screen.queryByTestId("virtualized-item-0")).toBeInTheDocument()); + await waitFor(() => expect(screen.queryByTestId("virtualized-item-10")).toBeInTheDocument()); - await waitFor(() => expect(screen.queryByTestId("summary-Pre task execution logs")).toBeInTheDocument(), { - timeout: 10_000, - }); + fireEvent.scroll(screen.getByTestId("virtualized-list"), { target: { scrollTop: ITEM_HEIGHT * 6 } }); + await waitFor(() => expect(screen.queryByTestId("virtualized-item-16")).toBeInTheDocument()); + + await waitFor(() => expect(screen.queryByTestId("summary-Pre task execution logs")).toBeInTheDocument()); await waitFor(() => expect(screen.getByTestId("summary-Pre task execution logs")).toBeVisible()); await waitFor(() => expect(screen.queryByText(/Task instance is in running state/iu)).not.toBeVisible()); await waitFor(() => screen.getByTestId("summary-Pre task execution logs").click()); await waitFor(() => expect(screen.queryByText(/Task instance is in running state/iu)).toBeVisible()); - }); + }, 10_000); }); diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx index 4e2c7af02d1ae..3b9864a6db3ad 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx @@ -88,7 +88,7 @@ export const Logs = () => { }); return ( - + { error={error} isLoading={isLoading || isLoadingLogs} logError={logError} - parsedLogs={data.parsedLogs} + parsedLogs={data.parsedLogs ?? []} wrap={wrap} /> @@ -124,12 +124,12 @@ export const Logs = () => { - + diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx index db62882ad08ad..e2c5109ab414c 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx @@ -17,8 +17,8 @@ * under the License. */ import { Box, Code, VStack, useToken } from "@chakra-ui/react"; -import type { ReactNode } from "react"; -import { useLayoutEffect } from "react"; +import { useVirtualizer } from "@tanstack/react-virtual"; +import { useLayoutEffect, useRef } from "react"; import { ErrorAlert } from "src/components/ErrorAlert"; import { ProgressBar } from "src/components/ui"; @@ -27,12 +27,19 @@ type Props = { readonly error: unknown; readonly isLoading: boolean; readonly logError: unknown; - readonly parsedLogs: ReactNode; + readonly parsedLogs: Array; readonly wrap: boolean; }; export const TaskLogContent = ({ error, isLoading, logError, parsedLogs, wrap }: Props) => { const [bgLine] = useToken("colors", ["blue.emphasized"]); + const parentRef = useRef(null); + const rowVirtualizer = useVirtualizer({ + count: parsedLogs.length, + estimateSize: () => 20, + getScrollElement: () => parentRef.current, + overscan: 10, + }); useLayoutEffect(() => { if (location.hash) { @@ -53,7 +60,7 @@ export const TaskLogContent = ({ error, isLoading, logError, parsedLogs, wrap }: }, [isLoading, bgLine]); return ( - + - - {parsedLogs} + + {rowVirtualizer.getVirtualItems().map((virtualRow) => ( + + {parsedLogs[virtualRow.index] ?? undefined} + + ))} diff --git a/airflow-core/src/airflow/ui/src/queries/useLogs.tsx b/airflow-core/src/airflow/ui/src/queries/useLogs.tsx index 3007deee70d9a..01b476f93c957 100644 --- a/airflow-core/src/airflow/ui/src/queries/useLogs.tsx +++ b/airflow-core/src/airflow/ui/src/queries/useLogs.tsx @@ -28,6 +28,7 @@ import { isStatePending, useAutoRefresh } from "src/utils"; import { getTaskInstanceLink } from "src/utils/links"; type Props = { + accept?: "*/*" | "application/json" | "application/x-ndjson"; dagId: string; logLevelFilters?: Array; sourceFilters?: Array; @@ -120,13 +121,14 @@ const parseLogs = ({ data, logLevelFilters, sourceFilters, taskInstance, tryNumb }; export const useLogs = ( - { dagId, logLevelFilters, sourceFilters, taskInstance, tryNumber = 1 }: Props, + { accept = "application/json", dagId, logLevelFilters, sourceFilters, taskInstance, tryNumber = 1 }: Props, options?: Omit, "queryFn" | "queryKey">, ) => { const refetchInterval = useAutoRefresh({ dagId }); const { data, ...rest } = useTaskInstanceServiceGetLog( { + accept, dagId, dagRunId: taskInstance?.dag_run_id ?? "", mapIndex: taskInstance?.map_index ?? -1, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py index a906edc8b2d51..1b10e4c16a96e 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py @@ -213,9 +213,7 @@ def test_should_respond_200_json(self, try_number): ), ], ) - def test_should_respond_200_text_plain( - self, request_url, expected_filename, extra_query_string, try_number - ): + def test_should_respond_200_ndjson(self, request_url, expected_filename, extra_query_string, try_number): expected_filename = expected_filename.replace("LOG_DIR", str(self.log_dir)) key = self.app.state.secret_key @@ -225,7 +223,7 @@ def test_should_respond_200_text_plain( response = self.client.get( request_url, params={"token": token, **extra_query_string}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 200 @@ -281,7 +279,7 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu response = self.client.get( request_url, params={"token": token, **extra_query_string}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 200 @@ -316,7 +314,7 @@ def test_get_logs_with_metadata_as_download_large_file(self, try_number): response = self.client.get( f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/" f"taskInstances/{self.TASK_ID}/logs/{try_number}?full_content=True", - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert "1st line" in response.content.decode("utf-8") @@ -384,7 +382,7 @@ def test_should_raise_404_when_missing_map_index_param_for_mapped_task(self): response = self.client.get( f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.MAPPED_TASK_ID}/logs/1", params={"token": token}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 404 assert response.json()["detail"] == "TaskInstance not found" @@ -397,7 +395,7 @@ def test_should_raise_404_when_filtering_on_map_index_for_unmapped_task(self): response = self.client.get( f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1", params={"token": token, "map_index": 0}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 404 assert response.json()["detail"] == "TaskInstance not found"