Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@
},
"graphDirection": {
"label": "Graph Direction"
},
"taskStreamFilter": {
"activeFilter": "Active filter",
"clearFilter": "Clear Filter",
"clickTask": "Click a task to select it as the filter root",
"label": "Filter",
"options": {
"both": "Both upstream & downstream",
"downstream": "Downstream",
"upstream": "Upstream"
},
"selectedTask": "Selected task"
}
},
"paramsFailed": "Failed to load params",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
import { useCallback, useMemo, useEffect, type PropsWithChildren } from "react";
import { useCallback, useMemo, useEffect, useRef, type PropsWithChildren } from "react";
import { useDebouncedCallback } from "use-debounce";
import { useLocalStorage } from "usehooks-ts";

Expand All @@ -36,6 +36,13 @@ export const OpenGroupsProvider = ({ children, dagId }: Props) => {
const [openGroupIds, setOpenGroupIds] = useLocalStorage<Array<string>>(openGroupsKey, []);
const [allGroupIds, setAllGroupIds] = useLocalStorage<Array<string>>(allGroupsKey, []);

// use a ref to track the current allGroupIds without causing re-renders
const allGroupIdsRef = useRef(allGroupIds);

useEffect(() => {
allGroupIdsRef.current = allGroupIds;
}, [allGroupIds]);

// For Graph view support: dependencies + selected version
const selectedVersion = useSelectedVersion();
const [dependencies] = useLocalStorage<"all" | "immediate" | "tasks">(`dependencies-${dagId}`, "tasks");
Expand All @@ -55,10 +62,10 @@ export const OpenGroupsProvider = ({ children, dagId }: Props) => {
useEffect(() => {
const observedGroupIds = flattenGraphNodes(structure.nodes).allGroupIds;

if (JSON.stringify(observedGroupIds) !== JSON.stringify(allGroupIds)) {
if (JSON.stringify(observedGroupIds) !== JSON.stringify(allGroupIdsRef.current)) {
setAllGroupIds(observedGroupIds);
}
}, [structure.nodes, allGroupIds, setAllGroupIds]);
}, [structure.nodes, setAllGroupIds]);

const debouncedSetOpenGroupIds = useDebouncedCallback(
(newGroupIds: Array<string>) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { useToken } from "@chakra-ui/react";
import { ReactFlow, Controls, Background, MiniMap, type Node as ReactFlowNode } from "@xyflow/react";
import "@xyflow/react/dist/style.css";
import { useEffect } from "react";
import { useParams } from "react-router-dom";
import { useParams, useSearchParams } from "react-router-dom";
import { useLocalStorage } from "usehooks-ts";

import { useStructureServiceStructureData } from "openapi/queries";
Expand Down Expand Up @@ -61,9 +61,16 @@ const nodeColor = (
export const Graph = () => {
const { colorMode = "light" } = useColorMode();
const { dagId = "", groupId, runId = "", taskId } = useParams();
const [searchParams] = useSearchParams();

const selectedVersion = useSelectedVersion();

const filterRoot = searchParams.get("root") ?? undefined;
const includeUpstream = searchParams.get("upstream") === "true";
const includeDownstream = searchParams.get("downstream") === "true";

const hasActiveFilter = includeUpstream || includeDownstream;

// corresponds to the "bg", "bg.emphasized", "border.inverted" semantic tokens
const [oddLight, oddDark, evenLight, evenDark, selectedDarkColor, selectedLightColor] = useToken("colors", [
"bg",
Expand All @@ -84,6 +91,9 @@ export const Graph = () => {
{
dagId,
externalDependencies: dependencies === "immediate",
includeDownstream,
includeUpstream,
root: hasActiveFilter && filterRoot !== undefined ? filterRoot : undefined,
versionNumber: selectedVersion,
},
undefined,
Expand Down
65 changes: 63 additions & 2 deletions airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import dayjsDuration from "dayjs/plugin/duration";
import { useEffect, useMemo, useRef, useState } from "react";
import { useTranslation } from "react-i18next";
import { FiChevronsRight } from "react-icons/fi";
import { Link, useParams } from "react-router-dom";
import { Link, useParams, useSearchParams } from "react-router-dom";

import { useStructureServiceStructureData } from "openapi/queries";
import type { DagRunState, DagRunType, GridRunsResponse } from "openapi/requests";
import { useOpenGroups } from "src/context/openGroups";
import { useNavigation } from "src/hooks/navigation";
import useSelectedVersion from "src/hooks/useSelectedVersion";
import { useGridRuns } from "src/queries/useGridRuns.ts";
import { useGridStructure } from "src/queries/useGridStructure.ts";
import { isStatePending } from "src/utils";
Expand Down Expand Up @@ -54,6 +56,11 @@ export const Grid = ({ dagRunState, limit, runType, showGantt, triggeringUser }:
const [selectedIsVisible, setSelectedIsVisible] = useState<boolean | undefined>();
const { openGroupIds, toggleGroupId } = useOpenGroups();
const { dagId = "", runId = "" } = useParams();
const [searchParams] = useSearchParams();

const filterRoot = searchParams.get("root") ?? undefined;
const includeUpstream = searchParams.get("upstream") === "true";
const includeDownstream = searchParams.get("downstream") === "true";

const { data: gridRuns, isLoading } = useGridRuns({ dagRunState, limit, runType, triggeringUser });

Expand All @@ -77,6 +84,48 @@ export const Grid = ({ dagRunState, limit, runType, showGantt, triggeringUser }:
triggeringUser,
});

const selectedVersion = useSelectedVersion();

const hasActiveFilter = includeUpstream || includeDownstream;

// fetch filtered structure when filter is active
const { data: taskStructure } = useStructureServiceStructureData(
{
dagId,
externalDependencies: false,
includeDownstream,
includeUpstream,
root: hasActiveFilter && filterRoot !== undefined ? filterRoot : undefined,
versionNumber: selectedVersion,
},
undefined,
{
enabled: selectedVersion !== undefined && hasActiveFilter && filterRoot !== undefined,
},
);

// extract allowed task IDs from task structure when filter is active
const allowedTaskIds = useMemo(() => {
if (!hasActiveFilter || filterRoot === undefined || taskStructure === undefined) {
return undefined;
}

const taskIds = new Set<string>();

const addNodeAndChildren = <T extends { children?: Array<T> | null; id: string }>(currentNode: T) => {
taskIds.add(currentNode.id);
if (currentNode.children) {
currentNode.children.forEach((child) => addNodeAndChildren(child));
}
};

taskStructure.nodes.forEach((node) => {
addNodeAndChildren(node);
});

return taskIds;
}, [hasActiveFilter, filterRoot, taskStructure]);

// calculate dag run bar heights relative to max
const max = Math.max.apply(
undefined,
Expand All @@ -87,7 +136,19 @@ export const Grid = ({ dagRunState, limit, runType, showGantt, triggeringUser }:
.filter((duration: number | null): duration is number => duration !== null),
);

const { flatNodes } = useMemo(() => flattenNodes(dagStructure, openGroupIds), [dagStructure, openGroupIds]);
const { flatNodes } = useMemo(() => {
const nodes = flattenNodes(dagStructure, openGroupIds);

// filter nodes based on task stream filter if active
if (allowedTaskIds !== undefined) {
return {
...nodes,
flatNodes: nodes.flatNodes.filter((node) => allowedTaskIds.has(node.id)),
};
}

return nodes;
}, [dagStructure, openGroupIds, allowedTaskIds]);

const { setMode } = useNavigation({
onToggleGroup: toggleGroupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import { dagRunTypeOptions, dagRunStateOptions } from "src/constants/stateOption
import { useContainerWidth } from "src/utils/useContainerWidth";

import { DagRunSelect } from "./DagRunSelect";
import { TaskStreamFilter } from "./TaskStreamFilter";
import { ToggleGroups } from "./ToggleGroups";

type Props = {
Expand Down Expand Up @@ -259,6 +260,7 @@ export const PanelButtons = ({
</ButtonGroup>
<Flex alignItems="center" gap={1} justifyContent="space-between" pl={2} pr={6}>
<ToggleGroups />
<TaskStreamFilter />
{/* eslint-disable-next-line jsx-a11y/no-autofocus */}
<Popover.Root autoFocus={false} positioning={{ placement: "bottom-end" }}>
<Popover.Trigger asChild>
Expand Down
Loading