diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json index 8085fcb942d81..f101d9eb044c9 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json @@ -114,6 +114,18 @@ }, "graphDirection": { "label": "Graph Direction" + }, + "taskStreamFilter": { + "activeFilter": "Active filter", + "clearFilter": "Clear Filter", + "clickTask": "Click a task to select it as the filter root", + "label": "Filter", + "options": { + "both": "Both upstream & downstream", + "downstream": "Downstream", + "upstream": "Upstream" + }, + "selectedTask": "Selected task" } }, "paramsFailed": "Failed to load params", diff --git a/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx b/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx index c65e9a9e1c361..188b79a84d755 100644 --- a/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx +++ b/airflow-core/src/airflow/ui/src/context/openGroups/OpenGroupsProvider.tsx @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import { useCallback, useMemo, useEffect, type PropsWithChildren } from "react"; +import { useCallback, useMemo, useEffect, useRef, type PropsWithChildren } from "react"; import { useDebouncedCallback } from "use-debounce"; import { useLocalStorage } from "usehooks-ts"; @@ -36,6 +36,13 @@ export const OpenGroupsProvider = ({ children, dagId }: Props) => { const [openGroupIds, setOpenGroupIds] = useLocalStorage>(openGroupsKey, []); const [allGroupIds, setAllGroupIds] = useLocalStorage>(allGroupsKey, []); + // use a ref to track the current allGroupIds without causing re-renders + const allGroupIdsRef = useRef(allGroupIds); + + useEffect(() => { + allGroupIdsRef.current = allGroupIds; + }, [allGroupIds]); + // For Graph view support: dependencies + selected version const selectedVersion = useSelectedVersion(); const [dependencies] = useLocalStorage<"all" | "immediate" | "tasks">(`dependencies-${dagId}`, "tasks"); @@ -55,10 +62,10 @@ export const OpenGroupsProvider = ({ children, dagId }: Props) => { useEffect(() => { const observedGroupIds = flattenGraphNodes(structure.nodes).allGroupIds; - if (JSON.stringify(observedGroupIds) !== JSON.stringify(allGroupIds)) { + if (JSON.stringify(observedGroupIds) !== JSON.stringify(allGroupIdsRef.current)) { setAllGroupIds(observedGroupIds); } - }, [structure.nodes, allGroupIds, setAllGroupIds]); + }, [structure.nodes, setAllGroupIds]); const debouncedSetOpenGroupIds = useDebouncedCallback( (newGroupIds: Array) => { diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx index 34698872fbd48..bd26ef5d5ee4b 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx +++ b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx @@ -20,7 +20,7 @@ import { useToken } from "@chakra-ui/react"; import { ReactFlow, Controls, Background, MiniMap, type Node as ReactFlowNode } from "@xyflow/react"; import "@xyflow/react/dist/style.css"; import { useEffect } from "react"; -import { useParams } from "react-router-dom"; +import { useParams, useSearchParams } from "react-router-dom"; import { useLocalStorage } from "usehooks-ts"; import { useStructureServiceStructureData } from "openapi/queries"; @@ -61,9 +61,16 @@ const nodeColor = ( export const Graph = () => { const { colorMode = "light" } = useColorMode(); const { dagId = "", groupId, runId = "", taskId } = useParams(); + const [searchParams] = useSearchParams(); const selectedVersion = useSelectedVersion(); + const filterRoot = searchParams.get("root") ?? undefined; + const includeUpstream = searchParams.get("upstream") === "true"; + const includeDownstream = searchParams.get("downstream") === "true"; + + const hasActiveFilter = includeUpstream || includeDownstream; + // corresponds to the "bg", "bg.emphasized", "border.inverted" semantic tokens const [oddLight, oddDark, evenLight, evenDark, selectedDarkColor, selectedLightColor] = useToken("colors", [ "bg", @@ -84,6 +91,9 @@ export const Graph = () => { { dagId, externalDependencies: dependencies === "immediate", + includeDownstream, + includeUpstream, + root: hasActiveFilter && filterRoot !== undefined ? filterRoot : undefined, versionNumber: selectedVersion, }, undefined, diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx index 0943f12f938f0..3690542e00b41 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx +++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx @@ -22,11 +22,13 @@ import dayjsDuration from "dayjs/plugin/duration"; import { useEffect, useMemo, useRef, useState } from "react"; import { useTranslation } from "react-i18next"; import { FiChevronsRight } from "react-icons/fi"; -import { Link, useParams } from "react-router-dom"; +import { Link, useParams, useSearchParams } from "react-router-dom"; +import { useStructureServiceStructureData } from "openapi/queries"; import type { DagRunState, DagRunType, GridRunsResponse } from "openapi/requests"; import { useOpenGroups } from "src/context/openGroups"; import { useNavigation } from "src/hooks/navigation"; +import useSelectedVersion from "src/hooks/useSelectedVersion"; import { useGridRuns } from "src/queries/useGridRuns.ts"; import { useGridStructure } from "src/queries/useGridStructure.ts"; import { isStatePending } from "src/utils"; @@ -54,6 +56,11 @@ export const Grid = ({ dagRunState, limit, runType, showGantt, triggeringUser }: const [selectedIsVisible, setSelectedIsVisible] = useState(); const { openGroupIds, toggleGroupId } = useOpenGroups(); const { dagId = "", runId = "" } = useParams(); + const [searchParams] = useSearchParams(); + + const filterRoot = searchParams.get("root") ?? undefined; + const includeUpstream = searchParams.get("upstream") === "true"; + const includeDownstream = searchParams.get("downstream") === "true"; const { data: gridRuns, isLoading } = useGridRuns({ dagRunState, limit, runType, triggeringUser }); @@ -77,6 +84,48 @@ export const Grid = ({ dagRunState, limit, runType, showGantt, triggeringUser }: triggeringUser, }); + const selectedVersion = useSelectedVersion(); + + const hasActiveFilter = includeUpstream || includeDownstream; + + // fetch filtered structure when filter is active + const { data: taskStructure } = useStructureServiceStructureData( + { + dagId, + externalDependencies: false, + includeDownstream, + includeUpstream, + root: hasActiveFilter && filterRoot !== undefined ? filterRoot : undefined, + versionNumber: selectedVersion, + }, + undefined, + { + enabled: selectedVersion !== undefined && hasActiveFilter && filterRoot !== undefined, + }, + ); + + // extract allowed task IDs from task structure when filter is active + const allowedTaskIds = useMemo(() => { + if (!hasActiveFilter || filterRoot === undefined || taskStructure === undefined) { + return undefined; + } + + const taskIds = new Set(); + + const addNodeAndChildren = | null; id: string }>(currentNode: T) => { + taskIds.add(currentNode.id); + if (currentNode.children) { + currentNode.children.forEach((child) => addNodeAndChildren(child)); + } + }; + + taskStructure.nodes.forEach((node) => { + addNodeAndChildren(node); + }); + + return taskIds; + }, [hasActiveFilter, filterRoot, taskStructure]); + // calculate dag run bar heights relative to max const max = Math.max.apply( undefined, @@ -87,7 +136,19 @@ export const Grid = ({ dagRunState, limit, runType, showGantt, triggeringUser }: .filter((duration: number | null): duration is number => duration !== null), ); - const { flatNodes } = useMemo(() => flattenNodes(dagStructure, openGroupIds), [dagStructure, openGroupIds]); + const { flatNodes } = useMemo(() => { + const nodes = flattenNodes(dagStructure, openGroupIds); + + // filter nodes based on task stream filter if active + if (allowedTaskIds !== undefined) { + return { + ...nodes, + flatNodes: nodes.flatNodes.filter((node) => allowedTaskIds.has(node.id)), + }; + } + + return nodes; + }, [dagStructure, openGroupIds, allowedTaskIds]); const { setMode } = useNavigation({ onToggleGroup: toggleGroupId, diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx index 1d8fe5303bcd5..41019e53c16a3 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx +++ b/airflow-core/src/airflow/ui/src/layouts/Details/PanelButtons.tsx @@ -53,6 +53,7 @@ import { dagRunTypeOptions, dagRunStateOptions } from "src/constants/stateOption import { useContainerWidth } from "src/utils/useContainerWidth"; import { DagRunSelect } from "./DagRunSelect"; +import { TaskStreamFilter } from "./TaskStreamFilter"; import { ToggleGroups } from "./ToggleGroups"; type Props = { @@ -259,6 +260,7 @@ export const PanelButtons = ({ + {/* eslint-disable-next-line jsx-a11y/no-autofocus */} diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/TaskStreamFilter.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/TaskStreamFilter.tsx new file mode 100644 index 0000000000000..ce136433ce880 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/layouts/Details/TaskStreamFilter.tsx @@ -0,0 +1,200 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Portal, Text, VStack } from "@chakra-ui/react"; +import { useTranslation } from "react-i18next"; +import { FiChevronDown, FiFilter } from "react-icons/fi"; +import { Link, useParams, useSearchParams } from "react-router-dom"; + +import { Button } from "src/components/ui"; +import { Menu } from "src/components/ui/Menu"; + +export const TaskStreamFilter = () => { + const { t: translate } = useTranslation(["components", "dag"]); + const { taskId: currentTaskId } = useParams(); + const [searchParams] = useSearchParams(); + + const filterRoot = searchParams.get("root") ?? undefined; + const includeUpstream = searchParams.get("upstream") === "true"; + const includeDownstream = searchParams.get("downstream") === "true"; + + const isCurrentTaskTheRoot = currentTaskId === filterRoot; + const bothActive = isCurrentTaskTheRoot && includeUpstream && includeDownstream; + const activeUpstream = isCurrentTaskTheRoot && includeUpstream && !includeDownstream; + const activeDownstream = isCurrentTaskTheRoot && includeDownstream && !includeUpstream; + const hasActiveFilter = includeUpstream || includeDownstream; + + const buildFilterSearch = (upstream: boolean, downstream: boolean, root?: string) => { + const newParams = new URLSearchParams(searchParams); + + if (upstream) { + newParams.set("upstream", "true"); + } else { + newParams.delete("upstream"); + } + + if (downstream) { + newParams.set("downstream", "true"); + } else { + newParams.delete("downstream"); + } + + if (root !== undefined && root !== "" && (upstream || downstream)) { + newParams.set("root", root); + } else { + newParams.delete("root"); + } + + return newParams.toString(); + }; + + return ( + + + + + + + + + {translate("dag:panel.taskStreamFilter.label")} + + + {filterRoot !== undefined && hasActiveFilter ? ( + + {translate("dag:panel.taskStreamFilter.activeFilter")}: {filterRoot} -{" "} + {includeUpstream && includeDownstream + ? translate("dag:panel.taskStreamFilter.options.both") + : includeUpstream + ? translate("dag:panel.taskStreamFilter.options.upstream") + : translate("dag:panel.taskStreamFilter.options.downstream")} + + ) : undefined} + + {currentTaskId === undefined ? ( + + {translate("dag:panel.taskStreamFilter.clickTask")} + + ) : ( + + {translate("dag:panel.taskStreamFilter.selectedTask")}: {currentTaskId} + + )} + + + + + + + + + + + + + + + + {hasActiveFilter && filterRoot !== undefined ? ( + + + + ) : undefined} + + + + + ); +};