Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { useTaskInstanceServiceGetMappedTaskInstance } from "openapi/queries";
import { Dialog } from "src/components/ui";
import { SearchParamsKeys } from "src/constants/searchParams";
import { useConfig } from "src/queries/useConfig";
import { useLogs } from "src/queries/useLogs";
import { useLogs, useLogDownload } from "src/queries/useLogs";

import { TaskLogContent } from "./TaskLogContent";
import { TaskLogHeader } from "./TaskLogHeader";
Expand Down Expand Up @@ -67,10 +67,6 @@ export const Logs = () => {
const toggleWrap = () => setWrap(!wrap);
const toggleFullscreen = () => setFullscreen(!fullscreen);

const onOpenChange = () => {
setFullscreen(false);
};

const {
data,
error: logError,
Expand All @@ -83,9 +79,33 @@ export const Logs = () => {
tryNumber: tryNumber === 0 ? 1 : tryNumber,
});

const { datum } = useLogDownload({
dagId,
logLevelFilters,
sourceFilters,
taskInstance,
tryNumber: tryNumber === 0 ? 1 : tryNumber,
});

const downloadLog = () => {
const texts = datum as Array<BlobPart>;
const file = new Blob(texts, { type: "text/plain" });
const element = document.createElement("a");

element.href = URL.createObjectURL(file);
element.download = `taskInstanceLogs.txt`;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks the filename for the downloaded log file. This should include dagId and taskInstance details in the name like Airflow 2 else this might keep overwriting the file on disk or create something like taskInstanceLogs(1).txt

Copy link
Contributor

@bbovenzi bbovenzi Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on ${dagId}-${taskId}-${runId}-${mapIndex}-${tryNumber}.txt

document.body.append(element);
element.click();
};

const onOpenChange = () => {
setFullscreen(false);
};

return (
<Box p={2}>
<TaskLogHeader
downloadLog={downloadLog}
onSelectTryNumber={onSelectTryNumber}
sourceOptions={data.sources}
taskInstance={taskInstance}
Expand All @@ -106,6 +126,7 @@ export const Logs = () => {
<Dialog.Header>
<Heading size="xl">{taskId}</Heading>
<TaskLogHeader
downloadLog={downloadLog}
isFullscreen
onSelectTryNumber={onSelectTryNumber}
taskInstance={taskInstance}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { system } from "src/theme";
import { type LogLevel, logLevelColorMapping, logLevelOptions } from "src/utils/logs";

type Props = {
readonly downloadLog: () => void;
readonly isFullscreen?: boolean;
readonly onSelectTryNumber: (tryNumber: number) => void;
readonly sourceOptions?: Array<string>;
Expand All @@ -47,6 +48,7 @@ type Props = {
};

export const TaskLogHeader = ({
downloadLog,
isFullscreen = false,
onSelectTryNumber,
sourceOptions,
Expand Down Expand Up @@ -182,6 +184,9 @@ export const TaskLogHeader = ({
<Button aria-label={wrap ? "Unwrap" : "Wrap"} bg="bg.panel" onClick={toggleWrap} variant="outline">
{wrap ? "Unwrap" : "Wrap"}
</Button>
<Button aria-label="Download Log" bg="bg.panel" onClick={downloadLog} variant="outline">
Download Log
</Button>
{!isFullscreen && (
<IconButton aria-label="Full screen" bg="bg.panel" onClick={toggleFullscreen} variant="outline">
<MdOutlineOpenInFull />
Expand Down
135 changes: 135 additions & 0 deletions airflow-core/src/airflow/ui/src/queries/useLogs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,138 @@ export const useLogs = (

return { data: parsedData, ...rest };
};

type LineObject = {
props?: Props;
};

const logDateTime = (line: string): string | undefined => {
if (!line || typeof line !== "object") {
return undefined;
}

const lineObj = line as LineObject;

if (!lineObj.props || !("children" in lineObj.props)) {
return undefined;
}

const { children } = lineObj.props;

if (!Array.isArray(children) || children.length <= 2) {
return undefined;
}

const { 2: thirdChild } = children;

const thirdChildObj = thirdChild as { props?: { datetime?: string } };

if (!thirdChildObj.props || typeof thirdChildObj.props.datetime !== "string") {
return undefined;
}

const datetimeStr = thirdChildObj.props.datetime;
const date = new Date(datetimeStr);

if (isNaN(date.getTime())) {
return undefined;
}

const year = date.getFullYear();
const month = date.getMonth() + 1;
const day = date.getDate();
const hours = date.getHours();
const minutes = date.getMinutes();
const seconds = date.getSeconds();
const formattedDate = `${year}-${month.toString().padStart(2, "0")}-${day.toString().padStart(2, "0")}`;
const formattedTime = `${hours.toString().padStart(2, "0")}:${minutes.toString().padStart(2, "0")}:${seconds.toString().padStart(2, "0")}`;

return `${formattedDate}, ${formattedTime}`;
};

const logText = ({ data, logLevelFilters, sourceFilters, taskInstance, tryNumber }: ParseLogsProps) => {
let warning;
let parsedLines;
const sources: Array<string> = [];
const logLink = taskInstance ? `${getTaskInstanceLink(taskInstance)}?try_number=${tryNumber}` : "";
const elements: Array<string> = [];

try {
parsedLines = data.map((datum, index) => {
if (typeof datum !== "string" && "logger" in datum) {
const source = datum.logger as string;

if (!sources.includes(source)) {
sources.push(source);
}
}

return renderStructuredLog({ index, logLevelFilters, logLink, logMessage: datum, sourceFilters });
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "An error occurred.";

console.warn(`Error parsing logs: ${errorMessage}`);
warning = "Unable to show logs. There was an error parsing logs.";

return { data, warning };
}
parsedLines.map((line) => {
const text = innerText(line);

if (text !== "") {
const datetime = logDateTime(line as string);

if (datetime === undefined) {
elements.push(`${text}\n`);
} else {
const first = text.slice(0, Math.max(0, text.indexOf("[")));
const second = text.slice(Math.max(0, text.indexOf("[") + 1));
const newtext = `${first}[${datetime}${second}`;

elements.push(`${newtext}\n`);
}
}

return text;
});

return elements;
};

export const useLogDownload = (
{ dagId, logLevelFilters, sourceFilters, taskInstance, tryNumber = 1 }: Props,
options?: Omit<UseQueryOptions<TaskInstancesLogResponse>, "queryFn" | "queryKey">,
) => {
const refetchInterval = useAutoRefresh({ dagId });

const { data, ...rest } = useTaskInstanceServiceGetLog(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you can pass text/plain in the accept here and get the raw value of the log file in the storage which is the value users usually want to reduce the code for formatting.

accept?: "application/json" | "text/plain" | "*/*";

if accept == Mimetype.JSON or accept == Mimetype.ANY: # default
logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
encoded_token = None
if not metadata.get("end_of_log", False):
encoded_token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
return TaskInstancesLogResponse.model_construct(continuation_token=encoded_token, content=logs)
# text/plain, or something else we don't understand. Return raw log content
# We need to exhaust the iterator before we can generate the continuation token.
# We could improve this by making it a streaming/async response, and by then setting the header using
# HTTP Trailers
logs = "".join(task_log_reader.read_log_stream(ti, try_number, metadata))
headers = None
if not metadata.get("end_of_log", False):
headers = {
"Airflow-Continuation-Token": URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
}
return Response(media_type="application/x-ndjson", content=logs, headers=headers)

{
dagId,
dagRunId: taskInstance?.dag_run_id ?? "",
mapIndex: taskInstance?.map_index ?? -1,
taskId: taskInstance?.task_id ?? "",
tryNumber,
},
undefined,
{
enabled: Boolean(taskInstance),
refetchInterval: (query) =>
isStatePending(taskInstance?.state) ||
dayjs(query.state.dataUpdatedAt).isBefore(taskInstance?.end_date)
? refetchInterval
: false,
...options,
},
);

const logs = logText({
data: data?.content ?? [],
logLevelFilters,
sourceFilters,
taskInstance,
tryNumber,
});

return { datum: logs, ...rest };
};