diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index 6369a819d26ef..d703feaef99c0 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -33,6 +33,7 @@ import useGraphData from "./useGraphData"; import useGridData from "./useGridData"; import useMappedInstances from "./useMappedInstances"; import useDatasets from "./useDatasets"; +import useDatasetsSummary from "./useDatasetsSummary"; import useDataset from "./useDataset"; import useDatasetDependencies from "./useDatasetDependencies"; import useDatasetEvents from "./useDatasetEvents"; @@ -72,9 +73,10 @@ export { useDagRuns, useDags, useDataset, + useDatasets, useDatasetDependencies, useDatasetEvents, - useDatasets, + useDatasetsSummary, useExtraLinks, useGraphData, useGridData, diff --git a/airflow/www/static/js/api/useDatasets.ts b/airflow/www/static/js/api/useDatasets.ts index 6150f51ce3592..db46415062c1a 100644 --- a/airflow/www/static/js/api/useDatasets.ts +++ b/airflow/www/static/js/api/useDatasets.ts @@ -21,65 +21,29 @@ import axios, { AxiosResponse } from "axios"; import { useQuery } from "react-query"; import { getMetaValue } from "src/utils"; -import type { DatasetListItem } from "src/types"; -import type { unitOfTime } from "moment"; - -export interface DatasetsData { - datasets: DatasetListItem[]; - totalEntries: number; -} - -export interface DateOption { - count: number; - unit: unitOfTime.DurationConstructor; -} +import type { API } from "src/types"; interface Props { - limit?: number; - offset?: number; - order?: string; - uri?: string; - updatedAfter?: DateOption; + dagIds?: string[]; + enabled?: boolean; } -export default function useDatasets({ - limit, - offset, - order, - uri, - updatedAfter, -}: Props) { - const query = useQuery( - ["datasets", limit, offset, order, uri, updatedAfter], +export default function useDatasets({ dagIds, enabled = true }: Props) { + return useQuery( + ["datasets", dagIds], () => { const datasetsUrl = getMetaValue("datasets_api"); - const orderParam = order ? { order_by: order } : {}; - const uriParam = uri ? { uri_pattern: uri } : {}; - const updatedAfterParam = - updatedAfter && updatedAfter.count && updatedAfter.unit - ? { - // @ts-ignore - updated_after: moment() - .subtract(updatedAfter.count, updatedAfter.unit) - .toISOString(), - } - : {}; - return axios.get(datasetsUrl, { + const dagIdsParam = + dagIds && dagIds.length ? { dag_ids: dagIds.join(",") } : {}; + + return axios.get(datasetsUrl, { params: { - offset, - limit, - ...orderParam, - ...uriParam, - ...updatedAfterParam, + ...dagIdsParam, }, }); }, { - keepPreviousData: true, + enabled, } ); - return { - ...query, - data: query.data ?? { datasets: [], totalEntries: 0 }, - }; } diff --git a/airflow/www/static/js/api/useDatasetsSummary.ts b/airflow/www/static/js/api/useDatasetsSummary.ts new file mode 100644 index 0000000000000..6f902946f6296 --- /dev/null +++ b/airflow/www/static/js/api/useDatasetsSummary.ts @@ -0,0 +1,85 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import axios, { AxiosResponse } from "axios"; +import { useQuery } from "react-query"; + +import { getMetaValue } from "src/utils"; +import type { DatasetListItem } from "src/types"; +import type { unitOfTime } from "moment"; + +export interface DatasetsData { + datasets: DatasetListItem[]; + totalEntries: number; +} + +export interface DateOption { + count: number; + unit: unitOfTime.DurationConstructor; +} + +interface Props { + limit?: number; + offset?: number; + order?: string; + uri?: string; + updatedAfter?: DateOption; +} + +export default function useDatasetsSummary({ + limit, + offset, + order, + uri, + updatedAfter, +}: Props) { + const query = useQuery( + ["datasets_summary", limit, offset, order, uri, updatedAfter], + () => { + const datasetsUrl = getMetaValue("datasets_summary"); + const orderParam = order ? { order_by: order } : {}; + const uriParam = uri ? { uri_pattern: uri } : {}; + const updatedAfterParam = + updatedAfter && updatedAfter.count && updatedAfter.unit + ? { + // @ts-ignore + updated_after: moment() + .subtract(updatedAfter.count, updatedAfter.unit) + .toISOString(), + } + : {}; + return axios.get(datasetsUrl, { + params: { + offset, + limit, + ...orderParam, + ...uriParam, + ...updatedAfterParam, + }, + }); + }, + { + keepPreviousData: true, + } + ); + return { + ...query, + data: query.data ?? { datasets: [], totalEntries: 0 }, + }; +} diff --git a/airflow/www/static/js/dag/details/graph/Node.test.tsx b/airflow/www/static/js/dag/details/graph/DagNode.test.tsx similarity index 91% rename from airflow/www/static/js/dag/details/graph/Node.test.tsx rename to airflow/www/static/js/dag/details/graph/DagNode.test.tsx index a01114ec04879..34ddac7506c71 100644 --- a/airflow/www/static/js/dag/details/graph/Node.test.tsx +++ b/airflow/www/static/js/dag/details/graph/DagNode.test.tsx @@ -26,7 +26,8 @@ import { Wrapper } from "src/utils/testUtils"; import type { NodeProps } from "reactflow"; import type { Task, TaskInstance } from "src/types"; -import { CustomNodeProps, BaseNode as Node } from "./Node"; +import type { CustomNodeProps } from "./Node"; +import DagNode from "./DagNode"; const mockNode: NodeProps = { id: "task_id", @@ -34,6 +35,7 @@ const mockNode: NodeProps = { label: "task_id", height: 50, width: 200, + class: "dag", instance: { state: "success", runId: "run_id", @@ -65,7 +67,7 @@ const mockNode: NodeProps = { describe("Test Graph Node", () => { test("Renders normal task correctly", async () => { - const { getByText, getByTestId } = render(, { + const { getByText, getByTestId } = render(, { wrapper: Wrapper, }); @@ -77,7 +79,7 @@ describe("Test Graph Node", () => { test("Renders mapped task correctly", async () => { const { getByText } = render( - { test("Renders task group correctly", async () => { const { getByText } = render( - , @@ -114,7 +116,7 @@ describe("Test Graph Node", () => { test("Renders normal task correctly", async () => { const { getByTestId } = render( - , + , { wrapper: Wrapper, } diff --git a/airflow/www/static/js/dag/details/graph/DagNode.tsx b/airflow/www/static/js/dag/details/graph/DagNode.tsx new file mode 100644 index 0000000000000..06809249ede8c --- /dev/null +++ b/airflow/www/static/js/dag/details/graph/DagNode.tsx @@ -0,0 +1,167 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from "react"; +import { Box, Flex, Text } from "@chakra-ui/react"; +import type { NodeProps } from "reactflow"; + +import { SimpleStatus } from "src/dag/StatusBox"; +import useSelection from "src/dag/useSelection"; +import { getGroupAndMapSummary, hoverDelay } from "src/utils"; +import Tooltip from "src/components/Tooltip"; +import InstanceTooltip from "src/dag/InstanceTooltip"; +import { useContainerRef } from "src/context/containerRef"; +import TaskName from "src/dag/TaskName"; + +import type { CustomNodeProps } from "./Node"; + +const DagNode = ({ + id, + data: { + label, + childCount, + height, + width, + instance, + task, + isSelected, + latestDagRunId, + onToggleCollapse, + isOpen, + isActive, + setupTeardownType, + labelStyle, + style, + isZoomedOut, + }, +}: NodeProps) => { + const { onSelect } = useSelection(); + const containerRef = useContainerRef(); + + if (!task) return null; + + const bg = isOpen ? "blackAlpha.50" : "white"; + const { isMapped } = task; + const mappedStates = instance?.mappedStates; + + const { totalTasks } = getGroupAndMapSummary({ group: task, mappedStates }); + + const taskName = isMapped + ? `${label} [${instance ? totalTasks : " "}]` + : label; + + let operatorTextColor = ""; + let operatorBG = ""; + if (style) { + [, operatorBG] = style.split(":"); + } + + if (labelStyle) { + [, operatorTextColor] = labelStyle.split(":"); + } + if (!operatorTextColor || operatorTextColor === "#000;") + operatorTextColor = "gray.500"; + + const nodeBorderColor = + instance?.state && stateColors[instance.state] + ? `${stateColors[instance.state]}.400` + : "gray.400"; + + return ( + + ) : null + } + portalProps={{ containerRef }} + hasArrow + placement="top" + openDelay={hoverDelay} + > + { + if (latestDagRunId) { + onSelect({ + runId: instance?.runId || latestDagRunId, + taskId: isSelected ? undefined : id, + }); + } + }} + px={isZoomedOut ? 1 : 2} + mt={isZoomedOut ? -2 : 0} + > + { + e.stopPropagation(); + onToggleCollapse(); + }} + setupTeardownType={setupTeardownType} + fontWeight="bold" + isZoomedOut={isZoomedOut} + mt={isZoomedOut ? -2 : 0} + noOfLines={2} + /> + {!isZoomedOut && ( + <> + {!!instance && instance.state && ( + + + + {instance.state} + + + )} + {task?.operator && ( + + {task.operator} + + )} + + )} + + + ); +}; + +export default DagNode; diff --git a/airflow/www/static/js/dag/details/graph/DatasetNode.tsx b/airflow/www/static/js/dag/details/graph/DatasetNode.tsx new file mode 100644 index 0000000000000..921341643bf5e --- /dev/null +++ b/airflow/www/static/js/dag/details/graph/DatasetNode.tsx @@ -0,0 +1,112 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from "react"; +import { + Box, + Link, + Popover, + PopoverArrow, + PopoverBody, + PopoverCloseButton, + PopoverContent, + PopoverHeader, + PopoverTrigger, + Portal, + Text, +} from "@chakra-ui/react"; +import { HiDatabase } from "react-icons/hi"; +import type { NodeProps } from "reactflow"; + +import { getMetaValue } from "src/utils"; +import { useContainerRef } from "src/context/containerRef"; +import type { CustomNodeProps } from "./Node"; + +const datasetsUrl = getMetaValue("datasets_url"); + +const DatasetNode = ({ + data: { label, height, width, latestDagRunId, isZoomedOut }, +}: NodeProps) => { + const containerRef = useContainerRef(); + + return ( + + + + + {label} + + {!isZoomedOut && ( + + + Dataset + + )} + + + + + + + {label} + + + View Dataset + + + + + + ); +}; + +export default DatasetNode; diff --git a/airflow/www/static/js/dag/details/graph/Node.tsx b/airflow/www/static/js/dag/details/graph/Node.tsx index 4ce193066c1d7..04f1d56fa0710 100644 --- a/airflow/www/static/js/dag/details/graph/Node.tsx +++ b/airflow/www/static/js/dag/details/graph/Node.tsx @@ -18,17 +18,13 @@ */ import React from "react"; -import { Box, Text, Flex } from "@chakra-ui/react"; +import { Box } from "@chakra-ui/react"; import { Handle, NodeProps, Position } from "reactflow"; -import { SimpleStatus } from "src/dag/StatusBox"; -import useSelection from "src/dag/useSelection"; -import type { DagRun, Task, TaskInstance } from "src/types"; -import { getGroupAndMapSummary, hoverDelay } from "src/utils"; -import Tooltip from "src/components/Tooltip"; -import InstanceTooltip from "src/dag/InstanceTooltip"; -import { useContainerRef } from "src/context/containerRef"; -import TaskName from "src/dag/TaskName"; +import type { DepNode, DagRun, Task, TaskInstance } from "src/types"; + +import DagNode from "./DagNode"; +import DatasetNode from "./DatasetNode"; export interface CustomNodeProps { label: string; @@ -47,186 +43,42 @@ export interface CustomNodeProps { labelStyle?: string; style?: string; isZoomedOut: boolean; + class: DepNode["value"]["class"]; } -export const BaseNode = ({ - id, - data: { - label, - childCount, - height, - width, - instance, - task, - isSelected, - latestDagRunId, - onToggleCollapse, - isOpen, - isActive, - setupTeardownType, - labelStyle, - style, - isZoomedOut, - }, -}: NodeProps) => { - const { onSelect } = useSelection(); - const containerRef = useContainerRef(); - - if (!task) return null; - - const bg = isOpen ? "blackAlpha.50" : "white"; - const { isMapped } = task; - const mappedStates = instance?.mappedStates; - - const { totalTasks } = getGroupAndMapSummary({ group: task, mappedStates }); - - const taskName = isMapped - ? `${label} [${instance ? totalTasks : " "}]` - : label; - - let operatorTextColor = ""; - let operatorBG = ""; - if (style) { - [, operatorBG] = style.split(":"); - } - - if (labelStyle) { - [, operatorTextColor] = labelStyle.split(":"); - } - if (!operatorTextColor || operatorTextColor === "#000;") - operatorTextColor = "gray.500"; - - const nodeBorderColor = - instance?.state && stateColors[instance.state] - ? `${stateColors[instance.state]}.400` - : "gray.400"; - - return ( - - ) : null - } - portalProps={{ containerRef }} - hasArrow - placement="top" - openDelay={hoverDelay} - > - { - if (latestDagRunId) { - onSelect({ - runId: instance?.runId || latestDagRunId, - taskId: isSelected ? undefined : id, - }); - } - }} - px={isZoomedOut ? 1 : 2} - mt={isZoomedOut ? -2 : 0} - > - { - e.stopPropagation(); - onToggleCollapse(); - }} - setupTeardownType={setupTeardownType} - fontWeight="bold" - isZoomedOut={isZoomedOut} - mt={isZoomedOut ? -2 : 0} - noOfLines={2} - /> - {!isZoomedOut && ( - <> - {!!instance && instance.state && ( - - - - {instance.state} - - - )} - {task?.operator && ( - - {task.operator} - - )} - - )} - - - ); -}; - const Node = (props: NodeProps) => { - const { - data: { height, width, isJoinNode, task }, - } = props; - if (isJoinNode) { + const { data } = props; + + if (data.isJoinNode) { return ( - <> - - - - + ); } - if (!task) return null; - return ( - <> - - - - - ); + if (data.class === "dataset") return ; + + return ; }; -export default Node; +const NodeWrapper = (props: NodeProps) => ( + <> + + + + +); + +export default NodeWrapper; diff --git a/airflow/www/static/js/dag/details/graph/index.tsx b/airflow/www/static/js/dag/details/graph/index.tsx index 4fb3d21f6c13a..84f71313a3756 100644 --- a/airflow/www/static/js/dag/details/graph/index.tsx +++ b/airflow/www/static/js/dag/details/graph/index.tsx @@ -30,11 +30,12 @@ import ReactFlow, { Viewport, } from "reactflow"; -import { useGraphData, useGridData } from "src/api"; +import { useDatasets, useGraphData, useGridData } from "src/api"; import useSelection from "src/dag/useSelection"; -import { useOffsetTop } from "src/utils"; +import { getMetaValue, useOffsetTop } from "src/utils"; import { useGraphLayout } from "src/utils/graph"; import Edge from "src/components/Graph/Edge"; +import type { DepNode, WebserverEdge } from "src/types"; import Node from "./Node"; import { buildEdges, nodeStrokeColor, nodeColor, flattenNodes } from "./utils"; @@ -48,6 +49,8 @@ interface Props { hoveredTaskState?: string | null; } +const dagId = getMetaValue("dag_id"); + const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => { const graphRef = useRef(null); const { data } = useGraphData(); @@ -59,13 +62,63 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => { setArrange(data?.arrange || "LR"); }, [data?.arrange]); + const { data: datasetsCollection } = useDatasets({ + dagIds: [dagId], + }); + + const rawNodes = + data?.nodes && datasetsCollection?.datasets?.length + ? { + ...data.nodes, + children: [ + ...(data.nodes.children || []), + ...(datasetsCollection?.datasets || []).map( + (dataset) => + ({ + id: dataset?.id?.toString() || "", + value: { + class: "dataset", + label: dataset.uri, + }, + } as DepNode) + ), + ], + } + : data?.nodes; + + const datasetEdges: WebserverEdge[] = []; + + datasetsCollection?.datasets?.forEach((dataset) => { + const producingTask = dataset?.producingTasks?.find( + (t) => t.dagId === dagId + ); + const consumingDag = dataset?.consumingDags?.find((d) => d.dagId === dagId); + if (dataset.id) { + if (producingTask?.taskId) { + datasetEdges.push({ + sourceId: producingTask.taskId, + targetId: dataset.id.toString(), + }); + } + if (consumingDag && data?.nodes?.children?.length) { + datasetEdges.push({ + sourceId: dataset.id.toString(), + // Point upstream datasets to the first task + targetId: data.nodes?.children[0].id, + }); + } + } + }); + const { data: graphData } = useGraphLayout({ - edges: data?.edges, - nodes: data?.nodes, + edges: [...(data?.edges || []), ...datasetEdges], + nodes: rawNodes, openGroupIds, arrange, }); + const { selected } = useSelection(); + const { data: { dagRuns, groups }, } = useGridData(); diff --git a/airflow/www/static/js/datasets/List.test.tsx b/airflow/www/static/js/datasets/List.test.tsx index c1cd0da344db0..f0c1523029220 100644 --- a/airflow/www/static/js/datasets/List.test.tsx +++ b/airflow/www/static/js/datasets/List.test.tsx @@ -22,7 +22,7 @@ import React from "react"; import { render } from "@testing-library/react"; -import * as useDatasetsModule from "src/api/useDatasets"; +import * as useDatasetsModule from "src/api/useDatasetsSummary"; import { Wrapper } from "src/utils/testUtils"; import type { UseQueryResult } from "react-query"; diff --git a/airflow/www/static/js/datasets/List.tsx b/airflow/www/static/js/datasets/List.tsx index a32793688353d..9d83406d7f4f6 100644 --- a/airflow/www/static/js/datasets/List.tsx +++ b/airflow/www/static/js/datasets/List.tsx @@ -37,11 +37,11 @@ import type { Row, SortingRule } from "react-table"; import { MdClose, MdSearch } from "react-icons/md"; import { useSearchParams } from "react-router-dom"; -import { useDatasets } from "src/api"; +import { useDatasetsSummary } from "src/api"; import { Table, TimeCell } from "src/components/Table"; import type { API } from "src/types"; import { getMetaValue } from "src/utils"; -import type { DateOption } from "src/api/useDatasets"; +import type { DateOption } from "src/api/useDatasetsSummary"; interface Props { onSelect: (datasetId: string) => void; @@ -99,7 +99,7 @@ const DatasetsList = ({ onSelect }: Props) => { const { data: { datasets, totalEntries }, isLoading, - } = useDatasets({ + } = useDatasetsSummary({ limit, offset, order, diff --git a/airflow/www/static/js/types/index.ts b/airflow/www/static/js/types/index.ts index b9ada90370907..926db4760d985 100644 --- a/airflow/www/static/js/types/index.ts +++ b/airflow/www/static/js/types/index.ts @@ -135,13 +135,13 @@ interface DepNode { id?: string; class: "dag" | "dataset" | "trigger" | "sensor"; label: string; - rx: number; - ry: number; + rx?: number; + ry?: number; isOpen?: boolean; isJoinNode?: boolean; childCount?: number; - labelStyle: string; - style: string; + labelStyle?: string; + style?: string; setupTeardownType?: "setup" | "teardown"; }; children?: DepNode[]; diff --git a/airflow/www/static/js/utils/graph.ts b/airflow/www/static/js/utils/graph.ts index 71003b0f1928d..d1b4b47bdeba3 100644 --- a/airflow/www/static/js/utils/graph.ts +++ b/airflow/www/static/js/utils/graph.ts @@ -174,6 +174,7 @@ const generateGraph = ({ } const extraLabelLength = value.label.length > 20 ? value.label.length - 19 : 0; + return { id, label: value.label, @@ -218,7 +219,7 @@ export const useGraphLayout = ({ return useQuery( [ "graphLayout", - !!nodes?.children, + nodes?.children?.length, openGroupIds, arrange, root, diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 6d6800e98c13c..5d854abe6e0d9 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -79,6 +79,7 @@ + diff --git a/airflow/www/templates/airflow/datasets.html b/airflow/www/templates/airflow/datasets.html index 0ffd11444a3fa..164aaf0006930 100644 --- a/airflow/www/templates/airflow/datasets.html +++ b/airflow/www/templates/airflow/datasets.html @@ -23,7 +23,7 @@ {% block head_meta %} {{ super() }} - +