Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
498d19d
refactor: Render templated fields before starting from trigger
davidblain-infrabel Jul 9, 2025
9fb1303
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Jul 9, 2025
34aa1fa
refactor: Moved StartTriggerArgs from airflow-core to task-sdk so it …
davidblain-infrabel Jul 9, 2025
bac3be7
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Jul 9, 2025
4fd077d
refactor: Import missing datetime
davidblain-infrabel Jul 9, 2025
0b2ac51
refactor: Fixed import of datetime
davidblain-infrabel Jul 9, 2025
a68bcb0
refactor: Reformatted files
davidblain-infrabel Jul 9, 2025
585d2b8
refactor: Changed import of StartTriggerArgs from new location in air…
davidblain-infrabel Jul 9, 2025
c43f152
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Jul 9, 2025
87325dc
refactor: Moved call of render_template_fields from DagRun to defer_t…
davidblain-infrabel Jul 9, 2025
42bc711
refactor: Moved call of render_template_fields from TaskInstance to e…
davidblain-infrabel Jul 9, 2025
897d7c0
refactor: Do the template rendering in update_triggers of TriggerRunn…
davidblain-infrabel Jul 14, 2025
e09bd1b
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
davidblain-infrabel Jul 14, 2025
80ed0f1
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Jul 31, 2025
cf04e27
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Jul 31, 2025
ed526a2
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 2, 2025
62b167a
refactor: Fixed imports tests
davidblain-infrabel Aug 4, 2025
9399680
refactor: Fixed imports
davidblain-infrabel Aug 4, 2025
9e2ef3f
refactor: Reformatted update_triggers
davidblain-infrabel Aug 4, 2025
00925c6
refactor: Reformatted expand_start_trigger_args
davidblain-infrabel Aug 4, 2025
5918847
refactor: Revert move of StartTriggerArgs to airflow-sdk
davidblain-infrabel Aug 4, 2025
882c3ec
Revert "refactor: Revert move of StartTriggerArgs to airflow-sdk"
davidblain-infrabel Aug 4, 2025
007f049
refactor: Defined StartTriggerArgs as deprecated classes in old impor…
davidblain-infrabel Aug 4, 2025
50a8a91
refactor: Moved definition of StartTriggerArgs as deprecated classes …
davidblain-infrabel Aug 4, 2025
8fba691
refactor: Moved import of StartTriggerArgs in type checking block
davidblain-infrabel Aug 4, 2025
2e2721b
refactor: Added StartTriggerArgs in __all__
davidblain-infrabel Aug 4, 2025
67f67de
Revert "refactor: Added StartTriggerArgs in __all__"
davidblain-infrabel Aug 4, 2025
834a1c5
Revert "refactor: Moved definition of StartTriggerArgs as deprecated …
davidblain-infrabel Aug 4, 2025
c1533ac
refactor: Import StartTriggerArgs in type checking block
davidblain-infrabel Aug 4, 2025
8a4e0c6
refactor: Moved definition of StartTriggerArgs as deprecated classes …
davidblain-infrabel Aug 4, 2025
617adad
refactor: Fixed inner import of StartTriggerArgs in expand_start_trig…
davidblain-infrabel Aug 4, 2025
b14cb93
refactor: Use explicit re-export for StartTriggerArgs
davidblain-infrabel Aug 4, 2025
f424810
Revert "refactor: Moved definition of StartTriggerArgs as deprecated …
davidblain-infrabel Aug 4, 2025
acf5e84
Revert "refactor: Use explicit re-export for StartTriggerArgs"
davidblain-infrabel Aug 4, 2025
74a332f
Revert "refactor: Import StartTriggerArgs in type checking block"
davidblain-infrabel Aug 4, 2025
3be9e8c
refactor: Removed StartTriggerArgs from deprecated_classes and overri…
davidblain-infrabel Aug 4, 2025
031b0ab
refactor: Updated warning message
davidblain-infrabel Aug 4, 2025
24aeb73
refactor: Reformatted warning message
davidblain-infrabel Aug 4, 2025
0d6fa17
refactor: Reformatted base
davidblain-infrabel Aug 4, 2025
4c74b93
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 4, 2025
8648135
refactor: Reformatted getattr to not do inner import to avoid static …
davidblain-infrabel Aug 5, 2025
43828ba
refactor: Try mocking DagBag for test_trigger_lifecycle
davidblain-infrabel Aug 5, 2025
ee59eb3
refactor: Reorganized imports
davidblain-infrabel Aug 5, 2025
5feff71
refactor: Try to fix test_trigger_create_race_condition_38599
davidblain-infrabel Aug 5, 2025
7b8f135
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 6, 2025
ec54166
refactor: Try to fix has not been assigned to a DAG yet error in test…
davidblain-infrabel Aug 6, 2025
d43d428
refactor: Try fixing DagRun validation errors for start_date and run_…
davidblain-infrabel Aug 6, 2025
80578c2
refactor: expand_start_trigger_args method doesn't have session param…
davidblain-infrabel Aug 6, 2025
dd3a6ee
refactor: Try fixing remaining tests
davidblain-infrabel Aug 6, 2025
59726a9
refactor: Reformatted test trigger job
davidblain-infrabel Aug 6, 2025
2d8cd6c
refactor: Check if start_trigger_args is not None
davidblain-infrabel Aug 6, 2025
a409559
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 6, 2025
e29db0c
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 12, 2025
aaf5e91
refactor: Removed old imports
davidblain-infrabel Aug 12, 2025
9efab00
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 12, 2025
11a9300
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 12, 2025
a923ed3
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 13, 2025
7053217
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 13, 2025
a863b8f
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 13, 2025
424164b
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 13, 2025
b3d22d6
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 13, 2025
14045f7
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 13, 2025
96caa21
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 13, 2025
8f87562
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 14, 2025
001138a
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 14, 2025
ce7d9cd
refactor: Fixed import issues
dabla Aug 18, 2025
0bfcdf5
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 18, 2025
98692b4
refactor: Reformatted test_trigger_create_race_condition_38599
dabla Aug 18, 2025
ccf2027
refactor: Try to fix test_update_triggers_prevents_duplicate_creation…
dabla Aug 18, 2025
cf1e851
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 18, 2025
146d364
refactor: Reformatted test_update_triggers_prevents_duplicate_creatio…
dabla Aug 18, 2025
bf04cb7
refactor: Added StartTriggerArgs to api.rst
dabla Aug 18, 2025
eeae97f
refactor: Added StartTriggerArgs in __init__.pyi
dabla Aug 18, 2025
78b1030
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 19, 2025
66c46ef
refactor: Added lazy import for StartTriggerArgs
dabla Aug 19, 2025
dca31aa
refactor: Changed order in __all__ for StartTriggerArgs
dabla Aug 19, 2025
e9fa7f3
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
uranusjr Aug 20, 2025
12ad45d
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 21, 2025
3408529
refactor: Reorganized imports for MappedOperator
dabla Aug 21, 2025
da832ba
Merge branch 'main' into fix/templated-fields-with-start-from-trigger
dabla Aug 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from airflow.executors import workloads
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import perform_heartbeat
from airflow.models import DagBag
from airflow.models.trigger import Trigger
from airflow.sdk.api.datamodels._generated import HITLDetailResponse
from airflow.sdk.execution_time.comms import (
Expand Down Expand Up @@ -604,6 +605,45 @@ def update_triggers(self, requested_trigger_ids: set[int]):
trigger set.
"""
render_log_fname = log_filename_template_renderer()
dag_bag = DagBag(collect_dags=False)

def expand_start_trigger_args(trigger: Trigger) -> Trigger:
task = dag_bag.get_dag(trigger.task_instance.dag_id).get_task(trigger.task_instance.task_id)
if task.template_fields:
trigger.task_instance.refresh_from_task(task)
context = trigger.task_instance.get_template_context()
task.render_template_fields(context=context)
start_trigger_args = task.expand_start_trigger_args(context=context)
if start_trigger_args:
trigger.kwargs = start_trigger_args.trigger_kwargs
return trigger

def create_workload(trigger: Trigger) -> workloads.RunTrigger:
if trigger.task_instance:
log_path = render_log_fname(ti=trigger.task_instance)

trigger = expand_start_trigger_args(trigger)

ser_ti = workloads.TaskInstance.model_validate(trigger.task_instance, from_attributes=True)
# When producing logs from TIs, include the job id producing the logs to disambiguate it.
self.logger_cache[new_id] = TriggerLoggingFactory(
log_path=f"{log_path}.trigger.{self.job.id}.log",
ti=ser_ti, # type: ignore
)

return workloads.RunTrigger(
classpath=trigger.classpath,
id=new_id,
encrypted_kwargs=trigger.encrypted_kwargs,
ti=ser_ti,
timeout_after=trigger.task_instance.trigger_timeout,
)
return workloads.RunTrigger(
classpath=trigger.classpath,
id=new_id,
encrypted_kwargs=trigger.encrypted_kwargs,
ti=None,
)

known_trigger_ids = (
self.running_triggers.union(x[0] for x in self.events)
Expand Down Expand Up @@ -641,26 +681,7 @@ def update_triggers(self, requested_trigger_ids: set[int]):
)
continue

workload = workloads.RunTrigger(
classpath=new_trigger_orm.classpath,
id=new_id,
encrypted_kwargs=new_trigger_orm.encrypted_kwargs,
ti=None,
)
if new_trigger_orm.task_instance:
log_path = render_log_fname(ti=new_trigger_orm.task_instance)

ser_ti = workloads.TaskInstance.model_validate(
new_trigger_orm.task_instance, from_attributes=True
)
# When producing logs from TIs, include the job id producing the logs to disambiguate it.
self.logger_cache[new_id] = TriggerLoggingFactory(
log_path=f"{log_path}.trigger.{self.job.id}.log",
ti=ser_ti, # type: ignore
)

workload.ti = ser_ti
workload.timeout_after = new_trigger_orm.task_instance.trigger_timeout
workload = create_workload(new_trigger_orm)

to_create.append(workload)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg
from airflow.sdk import Asset, AssetAlias, AssetAll, AssetAny, AssetWatcher, BaseOperator, XComArg
from airflow.sdk.bases.operator import OPERATOR_DEFAULTS # TODO: Copy this into the scheduler?
from airflow.sdk.bases.trigger import StartTriggerArgs
from airflow.sdk.definitions._internal.expandinput import EXPAND_INPUT_EMPTY
from airflow.sdk.definitions._internal.node import DAGNode
from airflow.sdk.definitions.asset import (
Expand Down Expand Up @@ -83,7 +84,7 @@
from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
from airflow.triggers.base import BaseTrigger, StartTriggerArgs
from airflow.triggers.base import BaseTrigger
from airflow.utils.code_utils import get_python_source
from airflow.utils.context import (
ConnectionAccessor,
Expand Down
23 changes: 13 additions & 10 deletions airflow-core/src/airflow/triggers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import abc
import json
from collections.abc import AsyncIterator
from dataclasses import dataclass
from datetime import timedelta
from typing import Annotated, Any

import structlog
Expand All @@ -33,20 +31,25 @@
)

from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string
from airflow.utils.state import TaskInstanceState

log = structlog.get_logger(logger_name=__name__)


@dataclass
class StartTriggerArgs:
"""Arguments required for start task execution from triggerer."""
def __getattr__(name: str):
if name == "StartTriggerArgs":
import warnings

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: timedelta | None = None
warnings.warn(
"airflow.triggers.base.StartTriggerArgs is deprecated. "
"Use airflow.sdk.bases.trigger.StartTriggerArgs instead.",
DeprecationWarning,
stacklevel=2,
)
return import_string(f"airflow.sdk.bases.trigger.{name}")

raise AttributeError(f"module '{__name__}' has no attribute '{name}'")


class BaseTrigger(abc.ABC, LoggingMixin):
Expand Down
68 changes: 57 additions & 11 deletions airflow-core/tests/unit/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
TriggerRunnerSupervisor,
messages,
)
from airflow.models import DagModel, DagRun, TaskInstance, Trigger
from airflow.models import DagBag, DagModel, DagRun, TaskInstance, Trigger
from airflow.models.baseoperator import BaseOperator
from airflow.models.connection import Connection
from airflow.models.dag import DAG
Expand Down Expand Up @@ -129,6 +129,15 @@ def create_trigger_in_db(session, trigger, operator=None):
return dag_model, run, trigger_orm, task_instance


def mock_dag_bag(mock_dag_bag_cls, task_instance: TaskInstance):
mock_dag = MagicMock(spec=DAG)
mock_dag.get_task.return_value = task_instance.task

mock_dag_bag = MagicMock(spec=DagBag)
mock_dag_bag.get_dag.return_value = mock_dag
mock_dag_bag_cls.return_value = mock_dag_bag


def test_is_needed(session):
"""Checks the triggerer-is-needed logic"""
# No triggers, no need
Expand Down Expand Up @@ -207,7 +216,8 @@ def builder(job=None):
return builder


def test_trigger_lifecycle(spy_agency: SpyAgency, session, testing_dag_bundle):
@patch("airflow.jobs.triggerer_job_runner.DagBag")
def test_trigger_lifecycle(mock_dag_bag_cls, spy_agency: SpyAgency, session, testing_dag_bundle):
"""
Checks that the triggerer will correctly see a new Trigger in the database
and send it to the trigger runner, and then delete it when it vanishes.
Expand All @@ -216,6 +226,8 @@ def test_trigger_lifecycle(spy_agency: SpyAgency, session, testing_dag_bundle):
# (we want to avoid it firing and deleting itself)
trigger = TimeDeltaTrigger(datetime.timedelta(days=7))
dag_model, run, trigger_orm, task_instance = create_trigger_in_db(session, trigger)
mock_dag_bag(mock_dag_bag_cls, task_instance)

# Make a TriggererJobRunner and have it retrieve DB tasks
trigger_runner_supervisor = TriggerRunnerSupervisor.start(job=Job(id=12345), capacity=10)

Expand Down Expand Up @@ -398,7 +410,10 @@ async def test_trigger_kwargs_serialization_cleanup(self, session):


@pytest.mark.asyncio
async def test_trigger_create_race_condition_38599(session, supervisor_builder, testing_dag_bundle):
@patch("airflow.jobs.triggerer_job_runner.DagBag")
async def test_trigger_create_race_condition_38599(
mock_dag_bag_cls, session, supervisor_builder, testing_dag_bundle
):
"""
This verifies the resolution of race condition documented in github issue #38599.
More details in the issue description.
Expand Down Expand Up @@ -427,10 +442,14 @@ async def test_trigger_create_race_condition_38599(session, supervisor_builder,
dm = DagModel(dag_id="test-dag", bundle_name=bundle_name)
session.add(dm)
SerializedDagModel.write_dag(dag, bundle_name=bundle_name)
dag_run = DagRun(dag.dag_id, run_id="abc", run_type="none", run_after=timezone.utcnow())
dag_run = DagRun(
dag.dag_id, run_id="abc", run_type="manual", start_date=timezone.utcnow(), run_after=timezone.utcnow()
)
dag_version = DagVersion.get_latest_version(dag.dag_id)
task = PythonOperator(task_id="dummy-task", python_callable=print)
task.dag = dag
ti = TaskInstance(
PythonOperator(task_id="dummy-task", python_callable=print),
task,
run_id=dag_run.run_id,
state=TaskInstanceState.DEFERRED,
dag_version_id=dag_version.id,
Expand All @@ -447,6 +466,8 @@ async def test_trigger_create_race_condition_38599(session, supervisor_builder,

session.commit()

mock_dag_bag(mock_dag_bag_cls, ti)

supervisor1 = supervisor_builder(job1)
supervisor2 = supervisor_builder(job2)

Expand Down Expand Up @@ -580,7 +601,8 @@ async def test_trigger_failing():
info["task"].cancel()


def test_failed_trigger(session, dag_maker, supervisor_builder):
@patch("airflow.jobs.triggerer_job_runner.DagBag")
def test_failed_trigger(mock_dag_bag_cls, session, dag_maker, supervisor_builder):
"""
Checks that the triggerer will correctly fail task instances that depend on
triggers that can't even be loaded.
Expand All @@ -603,6 +625,8 @@ def test_failed_trigger(session, dag_maker, supervisor_builder):
task_instance.trigger_id = trigger_orm.id
session.commit()

mock_dag_bag(mock_dag_bag_cls, task_instance)

supervisor: TriggerRunnerSupervisor = supervisor_builder()

supervisor.load_triggers()
Expand Down Expand Up @@ -746,7 +770,8 @@ def handle_events(self):

@pytest.mark.asyncio
@pytest.mark.execution_timeout(20)
async def test_trigger_can_call_variables_connections_and_xcoms_methods(session, dag_maker):
@patch("airflow.jobs.triggerer_job_runner.DagBag")
async def test_trigger_can_call_variables_connections_and_xcoms_methods(mock_dag_bag_cls, session, dag_maker):
"""Checks that the trigger will successfully call Variables, Connections and XComs methods."""
# Create the test DAG and task
with dag_maker(dag_id="trigger_accessing_variable_connection_and_xcom", session=session):
Expand Down Expand Up @@ -808,6 +833,8 @@ async def test_trigger_can_call_variables_connections_and_xcoms_methods(session,
session.add(job)
session.commit()

mock_dag_bag(mock_dag_bag_cls, task_instance)

supervisor = DummyTriggerRunnerSupervisor.start(job=job, capacity=1, logger=None)
supervisor.run()

Expand Down Expand Up @@ -878,7 +905,10 @@ async def run(self, **args) -> AsyncIterator[TriggerEvent]:

@pytest.mark.asyncio
@pytest.mark.execution_timeout(10)
async def test_trigger_can_fetch_trigger_dag_run_count_and_state_in_deferrable(session, dag_maker):
@patch("airflow.jobs.triggerer_job_runner.DagBag")
async def test_trigger_can_fetch_trigger_dag_run_count_and_state_in_deferrable(
mock_dag_bag_cls, session, dag_maker
):
"""Checks that the trigger will successfully fetch the count of trigger DAG runs."""
# Create the test DAG and task
with dag_maker(dag_id="trigger_can_fetch_trigger_dag_run_count_and_state_in_deferrable", session=session):
Expand Down Expand Up @@ -909,6 +939,8 @@ async def test_trigger_can_fetch_trigger_dag_run_count_and_state_in_deferrable(s
session.add(job)
session.commit()

mock_dag_bag(mock_dag_bag_cls, task_instance)

supervisor = DummyTriggerRunnerSupervisor.start(job=job, capacity=1, logger=None)
supervisor.run()

Expand Down Expand Up @@ -969,7 +1001,8 @@ async def run(self, **args) -> AsyncIterator[TriggerEvent]:

@pytest.mark.asyncio
@pytest.mark.execution_timeout(10)
async def test_trigger_can_fetch_dag_run_count_ti_count_in_deferrable(session, dag_maker):
@patch("airflow.jobs.triggerer_job_runner.DagBag")
async def test_trigger_can_fetch_dag_run_count_ti_count_in_deferrable(mock_dag_bag_cls, session, dag_maker):
"""Checks that the trigger will successfully fetch the count of DAG runs, Task count and task states."""
# Create the test DAG and task
with dag_maker(dag_id="parent_dag", session=session):
Expand Down Expand Up @@ -1010,6 +1043,8 @@ async def test_trigger_can_fetch_dag_run_count_ti_count_in_deferrable(session, d
session.add(job)
session.commit()

mock_dag_bag(mock_dag_bag_cls, task_instance)

supervisor = DummyTriggerRunnerSupervisor.start(job=job, capacity=1, logger=None)
supervisor.run()

Expand All @@ -1022,14 +1057,19 @@ async def test_trigger_can_fetch_dag_run_count_ti_count_in_deferrable(session, d
}


def test_update_triggers_prevents_duplicate_creation_queue_entries(session, supervisor_builder):
@patch("airflow.jobs.triggerer_job_runner.DagBag")
def test_update_triggers_prevents_duplicate_creation_queue_entries(
mock_dag_bag_cls, session, supervisor_builder
):
"""
Test that update_triggers prevents adding triggers to the creation queue
if they are already queued for creation.
"""
trigger = TimeDeltaTrigger(datetime.timedelta(days=7))
dag_model, run, trigger_orm, task_instance = create_trigger_in_db(session, trigger)

mock_dag_bag(mock_dag_bag_cls, task_instance)

supervisor = supervisor_builder()

# First call to update_triggers should add the trigger to creating_triggers
Expand All @@ -1051,8 +1091,9 @@ def test_update_triggers_prevents_duplicate_creation_queue_entries(session, supe
assert not any(trigger_id == trigger_orm.id for trigger_id, _ in supervisor.failed_triggers)


@patch("airflow.jobs.triggerer_job_runner.DagBag")
def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple_triggers(
session, supervisor_builder, dag_maker
mock_dag_bag_cls, session, supervisor_builder, dag_maker
):
"""
Test that update_triggers prevents adding multiple triggers to the creation queue
Expand All @@ -1063,6 +1104,8 @@ def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple

dag_model1, run1, trigger_orm1, task_instance1 = create_trigger_in_db(session, trigger1)

mock_dag_bag(mock_dag_bag_cls, task_instance1)

with dag_maker("test_dag_2"):
EmptyOperator(task_id="test_ti_2")

Expand All @@ -1071,6 +1114,9 @@ def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple
ti2 = run2.task_instances[0]
session.add(trigger_orm2)
session.flush()

mock_dag_bag(mock_dag_bag_cls, ti2)

ti2.trigger_id = trigger_orm2.id
session.merge(ti2)
session.flush()
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator, ShortCircuitOperator
from airflow.sdk import BaseOperator, setup, task, task_group, teardown
from airflow.sdk.bases.trigger import StartTriggerArgs
from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, DeadlineReference
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.stats import Stats
from airflow.task.trigger_rule import TriggerRule
from airflow.triggers.base import StartTriggerArgs
from airflow.utils.span_status import SpanStatus
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.thread_safe_dict import ThreadSafeDict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from airflow.sdk import AssetAlias, BaseHook, teardown
from airflow.sdk.bases.decorator import DecoratedOperator
from airflow.sdk.bases.operator import BaseOperator
from airflow.sdk.bases.trigger import StartTriggerArgs
from airflow.sdk.definitions._internal.expandinput import EXPAND_INPUT_EMPTY
from airflow.sdk.definitions.asset import Asset, AssetUniqueKey
from airflow.sdk.definitions.operator_resources import Resources
Expand All @@ -83,7 +84,6 @@
from airflow.task.priority_strategy import _DownstreamPriorityWeightStrategy
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
from airflow.timetables.simple import NullTimetable, OnceTimetable
from airflow.triggers.base import StartTriggerArgs
from airflow.utils.module_loading import qualname

from tests_common.test_utils.config import conf_vars
Expand Down
2 changes: 2 additions & 0 deletions task-sdk/docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Bases

.. autoapiclass:: airflow.sdk.BaseHook

.. autoapiclass:: airflow.sdk.StartTriggerArgs

Connections & Variables
-----------------------
.. autoapiclass:: airflow.sdk.Connection
Expand Down
Loading
Loading