diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx index e06c301da2c9e..b01a0b1ab955d 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx @@ -24,7 +24,7 @@ import { useTaskInstanceServiceGetMappedTaskInstance } from "openapi/queries"; import { Dialog } from "src/components/ui"; import { SearchParamsKeys } from "src/constants/searchParams"; import { useConfig } from "src/queries/useConfig"; -import { useLogs } from "src/queries/useLogs"; +import { useLogs, useLogDownload } from "src/queries/useLogs"; import { TaskLogContent } from "./TaskLogContent"; import { TaskLogHeader } from "./TaskLogHeader"; @@ -67,10 +67,6 @@ export const Logs = () => { const toggleWrap = () => setWrap(!wrap); const toggleFullscreen = () => setFullscreen(!fullscreen); - const onOpenChange = () => { - setFullscreen(false); - }; - const { data, error: logError, @@ -83,9 +79,33 @@ export const Logs = () => { tryNumber: tryNumber === 0 ? 1 : tryNumber, }); + const { datum } = useLogDownload({ + dagId, + logLevelFilters, + sourceFilters, + taskInstance, + tryNumber: tryNumber === 0 ? 1 : tryNumber, + }); + + const downloadLog = () => { + const texts = datum as Array; + const file = new Blob(texts, { type: "text/plain" }); + const element = document.createElement("a"); + + element.href = URL.createObjectURL(file); + element.download = `taskInstanceLogs.txt`; + document.body.append(element); + element.click(); + }; + + const onOpenChange = () => { + setFullscreen(false); + }; + return ( { {taskId} void; readonly isFullscreen?: boolean; readonly onSelectTryNumber: (tryNumber: number) => void; readonly sourceOptions?: Array; @@ -47,6 +48,7 @@ type Props = { }; export const TaskLogHeader = ({ + downloadLog, isFullscreen = false, onSelectTryNumber, sourceOptions, @@ -182,6 +184,9 @@ export const TaskLogHeader = ({ + {!isFullscreen && ( diff --git a/airflow-core/src/airflow/ui/src/queries/useLogs.tsx b/airflow-core/src/airflow/ui/src/queries/useLogs.tsx index 3007deee70d9a..6be6486d2f811 100644 --- a/airflow-core/src/airflow/ui/src/queries/useLogs.tsx +++ b/airflow-core/src/airflow/ui/src/queries/useLogs.tsx @@ -155,3 +155,138 @@ export const useLogs = ( return { data: parsedData, ...rest }; }; + +type LineObject = { + props?: Props; +}; + +const logDateTime = (line: string): string | undefined => { + if (!line || typeof line !== "object") { + return undefined; + } + + const lineObj = line as LineObject; + + if (!lineObj.props || !("children" in lineObj.props)) { + return undefined; + } + + const { children } = lineObj.props; + + if (!Array.isArray(children) || children.length <= 2) { + return undefined; + } + + const { 2: thirdChild } = children; + + const thirdChildObj = thirdChild as { props?: { datetime?: string } }; + + if (!thirdChildObj.props || typeof thirdChildObj.props.datetime !== "string") { + return undefined; + } + + const datetimeStr = thirdChildObj.props.datetime; + const date = new Date(datetimeStr); + + if (isNaN(date.getTime())) { + return undefined; + } + + const year = date.getFullYear(); + const month = date.getMonth() + 1; + const day = date.getDate(); + const hours = date.getHours(); + const minutes = date.getMinutes(); + const seconds = date.getSeconds(); + const formattedDate = `${year}-${month.toString().padStart(2, "0")}-${day.toString().padStart(2, "0")}`; + const formattedTime = `${hours.toString().padStart(2, "0")}:${minutes.toString().padStart(2, "0")}:${seconds.toString().padStart(2, "0")}`; + + return `${formattedDate}, ${formattedTime}`; +}; + +const logText = ({ data, logLevelFilters, sourceFilters, taskInstance, tryNumber }: ParseLogsProps) => { + let warning; + let parsedLines; + const sources: Array = []; + const logLink = taskInstance ? `${getTaskInstanceLink(taskInstance)}?try_number=${tryNumber}` : ""; + const elements: Array = []; + + try { + parsedLines = data.map((datum, index) => { + if (typeof datum !== "string" && "logger" in datum) { + const source = datum.logger as string; + + if (!sources.includes(source)) { + sources.push(source); + } + } + + return renderStructuredLog({ index, logLevelFilters, logLink, logMessage: datum, sourceFilters }); + }); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : "An error occurred."; + + console.warn(`Error parsing logs: ${errorMessage}`); + warning = "Unable to show logs. There was an error parsing logs."; + + return { data, warning }; + } + parsedLines.map((line) => { + const text = innerText(line); + + if (text !== "") { + const datetime = logDateTime(line as string); + + if (datetime === undefined) { + elements.push(`${text}\n`); + } else { + const first = text.slice(0, Math.max(0, text.indexOf("["))); + const second = text.slice(Math.max(0, text.indexOf("[") + 1)); + const newtext = `${first}[${datetime}${second}`; + + elements.push(`${newtext}\n`); + } + } + + return text; + }); + + return elements; +}; + +export const useLogDownload = ( + { dagId, logLevelFilters, sourceFilters, taskInstance, tryNumber = 1 }: Props, + options?: Omit, "queryFn" | "queryKey">, +) => { + const refetchInterval = useAutoRefresh({ dagId }); + + const { data, ...rest } = useTaskInstanceServiceGetLog( + { + dagId, + dagRunId: taskInstance?.dag_run_id ?? "", + mapIndex: taskInstance?.map_index ?? -1, + taskId: taskInstance?.task_id ?? "", + tryNumber, + }, + undefined, + { + enabled: Boolean(taskInstance), + refetchInterval: (query) => + isStatePending(taskInstance?.state) || + dayjs(query.state.dataUpdatedAt).isBefore(taskInstance?.end_date) + ? refetchInterval + : false, + ...options, + }, + ); + + const logs = logText({ + data: data?.content ?? [], + logLevelFilters, + sourceFilters, + taskInstance, + tryNumber, + }); + + return { datum: logs, ...rest }; +};