Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
from types import FrameType

from pendulum.datetime import DateTime
from sqlalchemy.orm import Query, Session
from sqlalchemy.orm import Load, Query, Session

from airflow._shared.logging.types import Logger
from airflow.executors.base_executor import BaseExecutor
Expand All @@ -110,6 +110,31 @@
""":meta private:"""


def _eager_load_dag_run_for_validation() -> tuple[Load, Load]:
"""
Eager-load DagRun relations required for execution API datamodel validation.

When building TaskCallbackRequest with DRDataModel.model_validate(ti.dag_run),
the consumed_asset_events collection and nested asset/source_aliases must be
preloaded to avoid DetachedInstanceError after the session closes.

Returns a tuple of two load options:
- Asset loader: TI.dag_run → consumed_asset_events → asset
- Alias loader: TI.dag_run → consumed_asset_events → source_aliases

Example usage::

asset_loader, alias_loader = _eager_load_dag_run_for_validation()
query = select(TI).options(asset_loader).options(alias_loader)
"""
# Traverse TI → dag_run → consumed_asset_events once, then branch to asset/aliases
base = joinedload(TI.dag_run).selectinload(DagRun.consumed_asset_events)
return (
base.selectinload(AssetEvent.asset),
base.selectinload(AssetEvent.source_aliases),
)


def _get_current_dag(dag_id: str, session: Session) -> SerializedDAG | None:
serdag = SerializedDagModel.get(dag_id=dag_id, session=session) # grabs the latest version
if not serdag:
Expand Down Expand Up @@ -806,11 +831,12 @@ def process_executor_events(

# Check state of finished tasks
filter_for_tis = TI.filter_for_tis(tis_with_right_state)
asset_loader, _ = _eager_load_dag_run_for_validation()
query = (
select(TI)
.where(filter_for_tis)
.options(selectinload(TI.dag_model))
.options(joinedload(TI.dag_run).selectinload(DagRun.consumed_asset_events))
.options(asset_loader)
.options(joinedload(TI.dag_run).selectinload(DagRun.created_dag_version))
.options(joinedload(TI.dag_version))
)
Expand Down Expand Up @@ -2375,10 +2401,12 @@ def _find_and_purge_task_instances_without_heartbeats(self) -> None:
def _find_task_instances_without_heartbeats(self, *, session: Session) -> list[TI]:
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._task_instance_heartbeat_timeout_secs)
asset_loader, alias_loader = _eager_load_dag_run_for_validation()
task_instances_without_heartbeats = session.scalars(
select(TI)
.options(selectinload(TI.dag_model))
.options(selectinload(TI.dag_run).selectinload(DagRun.consumed_asset_events))
.options(asset_loader)
.options(alias_loader)
.options(selectinload(TI.dag_version))
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(DM, TI.dag_id == DM.dag_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
"githubRepo": "GitHub Ablage",
"restApiReference": "REST API Referenz"
},
"download": {
"download": "Herunterladen",
"hotkey": "d",
"tooltip": "Drücken Sie {{hotkey}}, um Protokolle herunterzuladen"
},
"duration": "Laufzeit",
"endDate": "Enddatum",
"error": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@
"location": "Zeile {{line}} in {{name}}"
},
"reparseDag": "Dag neu parsen",
"sortedAscending": "aufsteigend sortier",
"sortedDescending": "absteigend sortier",
"sortedAscending": "aufsteigend sortiert",
"sortedDescending": "absteigend sortiert",
"sortedUnsorted": "unsortiert",
"taskTries": "Versuch des Tasks",
"toggleCardView": "Kachelansicht anzeigen",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
"all": "Alle",
"paused": "Pausiert"
},
"runIdPatternFilter": "Dag Läufe suchen",
"triggeringUserNameFilter": "Suche Läufe ausgelöst von..."
"runIdPatternFilter": "Dag Läufe suchen"
},
"ownerLink": "Besitzer Verlinkungen zu {{owner}}",
"runAndTaskActions": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{
"filters": {
"body": "Nachricht",
"createdAtFrom": "Erstellt von ",
"createdAtTo": "Erstellt bis ",
"response": {
"all": "Alle",
"pending": "Ausstehend",
Expand All @@ -12,11 +15,13 @@
"requiredActionCount_other": "{{count}} offene Interaktionen",
"requiredActionState": "Status der Interaktion",
"response": {
"created": "Antwort erstellt um ",
"error": "Senden der Antwort fehlgeschlagen",
"optionsDescription": "Wählen Sie Ihre Optionen für diesen Task",
"optionsLabel": "Optionen",
"received": "Antwort empfangen um ",
"respond": "Antworten",
"responded_by_user_name": "Beantwortet von (Benutzername)",
"success": "{{taskId}} Interaktion erfolgreich",
"title": "Erforderliche Interaktion - {{taskId}}"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@
},
"filter": "Filtr",
"filters": {
"durationFrom": "Czas trwania od",
"durationTo": "Czas trwania do",
"logicalDateFrom": "Data logiczna od",
"logicalDateTo": "Data logiczna do",
"runAfterFrom": "Uruchom po (od)",
Expand Down
2 changes: 2 additions & 0 deletions airflow-core/src/airflow/ui/public/i18n/locales/pl/dag.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"hourly": "Godzinowo",
"legend": {
"less": "Mniej",
"mixed": "Mieszane",
"more": "Więcej"
},
"navigation": {
Expand All @@ -19,6 +20,7 @@
"previousYear": "Poprzedni rok"
},
"noData": "Brak danych",
"noFailedRuns": "Brak nieudanych wykonań",
"noRuns": "Brak wykonań",
"totalRuns": "Łączna liczba wykonań",
"week": "Tydzień {{weekNumber}}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
"all": "Wszystkie",
"paused": "Wstrzymane"
},
"runIdPatternFilter": "Szukaj Wykonań Dagów"
"runIdPatternFilter": "Szukaj Wykonań Dagów",
"triggeringUserNameFilter": "Szukaj według użytkownika wyzwalającego"
},
"ownerLink": "Link do właściciela {{owner}}",
"runAndTaskActions": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,13 @@ export const PanelButtons = ({
);

return (
<Box position="absolute" ref={containerRef} top={1} width="100%" zIndex={1}>
<Flex justifyContent="space-between">
<Box position="absolute" px={2} ref={containerRef} top={1} width="100%" zIndex={1}>
<Flex justifyContent="space-between" pl={2}>
<ButtonGroup attached size="sm" variant="outline">
<IconButton
aria-label={translate("dag:panel.buttons.showGridShortcut")}
bg={dagView === "grid" ? "brand.500" : "bg.subtle"}
color={dagView === "grid" ? "white" : "fg.default"}
colorPalette="brand"
onClick={() => {
setDagView("grid");
Expand All @@ -221,12 +223,13 @@ export const PanelButtons = ({
}
}}
title={translate("dag:panel.buttons.showGridShortcut")}
variant={dagView === "grid" ? "solid" : "outline"}
>
<FiGrid />
</IconButton>
<IconButton
aria-label={translate("dag:panel.buttons.showGraphShortcut")}
bg={dagView === "graph" ? "brand.500" : "bg.subtle"}
color={dagView === "graph" ? "white" : "fg.default"}
colorPalette="brand"
onClick={() => {
setDagView("graph");
Expand All @@ -235,17 +238,16 @@ export const PanelButtons = ({
}
}}
title={translate("dag:panel.buttons.showGraphShortcut")}
variant={dagView === "graph" ? "solid" : "outline"}
>
<MdOutlineAccountTree />
</IconButton>
</ButtonGroup>
<Flex gap={1}>
<Flex alignItems="center" gap={1} justifyContent="space-between" pl={2} pr={6}>
<ToggleGroups />
{/* eslint-disable-next-line jsx-a11y/no-autofocus */}
<Popover.Root autoFocus={false} positioning={{ placement: "bottom-end" }}>
<Popover.Trigger asChild>
<Button size="sm" variant="outline">
<Button bg="bg.subtle" color="fg.default" size="sm" variant="outline">
{translate("dag:panel.buttons.options")}
<FiChevronDown size={8} />
</Button>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ const createColumns = (
header: "",
},
{
accessorKey: "favourite",
cell: ({ row: { original } }) => (
<FavoriteDagButton dagId={original.dag_id} isFavorite={original.is_favorite} withText={false} />
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { useParams } from "react-router-dom";
import { useTaskInstanceServiceGetExtraLinks } from "openapi/queries";

export const ExtraLinks = () => {
const { t: translate } = useTranslation();
const { t: translate } = useTranslation("dag");
const { dagId = "", mapIndex = "-1", runId = "", taskId = "" } = useParams();

const { data } = useTaskInstanceServiceGetExtraLinks({
Expand All @@ -35,7 +35,7 @@ export const ExtraLinks = () => {

return data && Object.keys(data.extra_links).length > 0 ? (
<Box py={1}>
<Heading size="sm">{translate("dag.extraLinks")}</Heading>
<Heading size="sm">{translate("extraLinks")}</Heading>
<HStack gap={2} py={2}>
{Object.entries(data.extra_links).map(([key, value], _) =>
value === null ? undefined : (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ export const useToggleFavoriteDag = (dagId: string) => {
queryKey: [useDagServiceGetDagsUiKey, UseDagServiceGetDagDetailsKeyFn({ dagId }, [{ dagId }])],
});

// Invalidate the specific DAG details query for this DAG
await queryClient.invalidateQueries({
queryKey: UseDagServiceGetDagDetailsKeyFn({ dagId }, [{ dagId }]),
});
const queryKeys = [
// Invalidate the specific DAG details query for this DAG and DAGs list query.
UseDagServiceGetDagDetailsKeyFn({ dagId }, [{ dagId }]),
[useDagServiceGetDagsUiKey],
];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
}, [queryClient, dagId]);

const favoriteMutation = useDagServiceFavoriteDag({
Expand Down
Loading