diff --git a/task-sdk/README.md b/task-sdk/README.md index 20d9e28b54ecf..e646ff5bdf4b8 100644 --- a/task-sdk/README.md +++ b/task-sdk/README.md @@ -27,7 +27,7 @@ [![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://s.apache.org/airflow-slack) -The Apache Airflow Task SDK includes interfaces for DAG authors and Task execution logic for Python. +The Apache Airflow Task SDK includes interfaces for Dag authors and Task execution logic for Python. ## Installation diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py index be27c28d3ff96..04575fa770d05 100644 --- a/task-sdk/src/airflow/sdk/api/client.py +++ b/task-sdk/src/airflow/sdk/api/client.py @@ -632,7 +632,7 @@ def trigger( logical_date: datetime | None = None, reset_dag_run: bool = False, ) -> OKResponse | ErrorResponse: - """Trigger a DAG run via the API server.""" + """Trigger a Dag run via the API server.""" body = TriggerDAGRunPayload(logical_date=logical_date, conf=conf or {}, reset_dag_run=reset_dag_run) try: @@ -642,23 +642,23 @@ def trigger( except ServerResponseError as e: if e.response.status_code == HTTPStatus.CONFLICT: if reset_dag_run: - log.info("DAG Run already exists; Resetting DAG Run.", dag_id=dag_id, run_id=run_id) + log.info("Dag Run already exists; Resetting Dag Run.", dag_id=dag_id, run_id=run_id) return self.clear(run_id=run_id, dag_id=dag_id) - log.info("DAG Run already exists!", detail=e.detail, dag_id=dag_id, run_id=run_id) + log.info("Dag Run already exists!", detail=e.detail, dag_id=dag_id, run_id=run_id) return ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS) raise return OKResponse(ok=True) def clear(self, dag_id: str, run_id: str) -> OKResponse: - """Clear a DAG run via the API server.""" + """Clear a Dag run via the API server.""" self.client.post(f"dag-runs/{dag_id}/{run_id}/clear") # TODO: Error handling return OKResponse(ok=True) def get_state(self, dag_id: str, run_id: str) -> DagRunStateResponse: - """Get the state of a DAG run via the API server.""" + """Get the state of a Dag run via the API server.""" resp = self.client.get(f"dag-runs/{dag_id}/{run_id}/state") return DagRunStateResponse.model_validate_json(resp.read()) @@ -669,7 +669,7 @@ def get_count( run_ids: list[str] | None = None, states: list[str] | None = None, ) -> DRCount: - """Get count of DAG runs matching the given criteria.""" + """Get count of Dag runs matching the given criteria.""" params = { "dag_id": dag_id, "logical_dates": [d.isoformat() for d in logical_dates] if logical_dates is not None else None, diff --git a/task-sdk/src/airflow/sdk/bases/decorator.py b/task-sdk/src/airflow/sdk/bases/decorator.py index becc7051229f0..bb96fab17fd31 100644 --- a/task-sdk/src/airflow/sdk/bases/decorator.py +++ b/task-sdk/src/airflow/sdk/bases/decorator.py @@ -113,7 +113,7 @@ def get_unique_task_id( task_group: TaskGroup | None = None, ) -> str: """ - Generate unique task id given a DAG (or if run in a DAG context). + Generate unique task id given a Dag (or if run in a Dag context). IDs are generated by appending a unique number to the end of the original task id. @@ -645,7 +645,7 @@ def task_decorator_factory( """ Generate a wrapper that wraps a function into an Airflow operator. - Can be reused in a single DAG. + Can be reused in a single Dag. :param python_callable: Function to decorate. :param multiple_outputs: If set to True, the decorated function's return diff --git a/task-sdk/src/airflow/sdk/bases/notifier.py b/task-sdk/src/airflow/sdk/bases/notifier.py index 6772e406f0bd4..7cfed6ae97efb 100644 --- a/task-sdk/src/airflow/sdk/bases/notifier.py +++ b/task-sdk/src/airflow/sdk/bases/notifier.py @@ -38,7 +38,7 @@ class BaseNotifier(LoggingMixin, Templater): It can be used asynchronously (preferred) if `async_notify`is implemented and/or synchronously if `notify` is implemented. - Currently, the DAG/Task state change callbacks run on the DAG Processor and only support sync usage. + Currently, the Dag/Task state change callbacks run on the Dag Processor and only support sync usage. Usage:: # Asynchronous usage @@ -109,7 +109,7 @@ def notify(self, context: Context) -> None: """ Send a notification (sync). - Implementing this is a requirement for running this notifier in the DAG processor, which is where the + Implementing this is a requirement for running this notifier in the Dag processor, which is where the `on_success_callback` and `on_failure_callback` run. :param context: The airflow context diff --git a/task-sdk/src/airflow/sdk/bases/operator.py b/task-sdk/src/airflow/sdk/bases/operator.py index 581b2b7c20ca7..ca4d5701071e6 100644 --- a/task-sdk/src/airflow/sdk/bases/operator.py +++ b/task-sdk/src/airflow/sdk/bases/operator.py @@ -315,7 +315,7 @@ def partial( if task_group: task_id = task_group.child_id(task_id) - # Merge DAG and task group level defaults into user-supplied values. + # Merge Dag and task group level defaults into user-supplied values. dag_default_args, partial_params = get_merged_defaults( dag=dag, task_group=task_group, @@ -331,7 +331,7 @@ def partial( **kwargs, } - # Inject DAG-level default args into args provided to this function. + # Inject Dag-level default args into args provided to this function. # Most of the default args will be retrieved during unmapping; here we # only ensure base properties are correctly set for the scheduler. partial_kwargs.update( @@ -613,8 +613,8 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta): r""" Abstract base class for all operators. - Since operators create objects that become nodes in the DAG, BaseOperator - contains many recursive methods for DAG crawling behavior. To derive from + Since operators create objects that become nodes in the Dag, BaseOperator + contains many recursive methods for Dag crawling behavior. To derive from this class, you are expected to override the constructor and the 'execute' method. @@ -628,7 +628,7 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta): This class is abstract and shouldn't be instantiated. Instantiating a class derived from this one results in the creation of a task object, - which ultimately becomes a node in DAG objects. Task dependencies should + which ultimately becomes a node in Dag objects. Task dependencies should be set by using the set_upstream and/or set_downstream methods. :param task_id: a unique, meaningful id for the task @@ -654,7 +654,7 @@ class derived from this one results in the creation of a task object, :param start_date: The ``start_date`` for the task, determines the ``logical_date`` for the first task instance. The best practice is to have the start_date rounded - to your DAG's ``schedule_interval``. Daily jobs have their start_date + to your Dag's ``schedule_interval``. Daily jobs have their start_date some day at 00:00:00, hourly jobs have their start_date at 00:00 of a specific hour. Note that Airflow simply looks at the latest ``logical_date`` and adds the ``schedule_interval`` to determine @@ -709,7 +709,7 @@ class derived from this one results in the creation of a task object, you know exactly what priority weight each task should have. Additionally, when set to ``absolute``, there is bonus effect of significantly speeding up the task creation process as for very large - DAGs. Options can be set as string or using the constants defined in + Dags. Options can be set as string or using the constants defined in the static class ``airflow.utils.WeightRule``. Irrespective of the weight rule, resulting priority values are capped with 32-bit. |experimental| @@ -744,7 +744,7 @@ class derived from this one results in the creation of a task object, :param on_skipped_callback: much like the ``on_failure_callback`` except that it is executed when skipped occur; this callback will be called only if AirflowSkipException get raised. Explicitly it is NOT called if a task is not started to be executed because of a preceding branching - decision in the DAG or a trigger rule which causes execution to skip so that the task execution + decision in the Dag or a trigger rule which causes execution to skip so that the task execution is never scheduled. :param pre_execute: a function to be called immediately before task execution, receiving a context dictionary; raising an exception will @@ -769,7 +769,7 @@ class derived from this one results in the creation of a task object, :param max_active_tis_per_dag: When set, a task will be able to limit the concurrent runs across logical_dates. :param max_active_tis_per_dagrun: When set, a task will be able to limit the concurrent - task instances per DAG run. + task instances per Dag run. :param executor: Which executor to target when running this task. NOT YET SUPPORTED :param executor_config: Additional task-level configuration parameters that are interpreted by a specific executor. Parameters are namespaced by the name of @@ -1031,7 +1031,7 @@ def __init__( allow_nested_operators: bool = True, **kwargs: Any, ): - # Note: Metaclass handles passing in the DAG/TaskGroup from active context manager, if any + # Note: Metaclass handles passing in the Dag/TaskGroup from active context manager, if any # Only apply task_group prefix if this operator was not created from a mapped operator # Mapped operators already have the prefix applied during their creation @@ -1289,14 +1289,14 @@ def get_dag(self) -> DAG | None: @property def dag(self) -> DAG: - """Returns the Operator's DAG if set, otherwise raises an error.""" + """Returns the Operator's Dag if set, otherwise raises an error.""" if dag := self._dag: return dag - raise RuntimeError(f"Operator {self} has not been assigned to a DAG yet") + raise RuntimeError(f"Operator {self} has not been assigned to a Dag yet") @dag.setter def dag(self, dag: DAG | None) -> None: - """Operators can be assigned to one DAG, one time. Repeat assignments to that same DAG are ok.""" + """Operators can be assigned to one Dag, one time. Repeat assignments to that same Dag are ok.""" self._dag = dag def _convert__dag(self, dag: DAG | None) -> DAG | None: @@ -1307,12 +1307,12 @@ def _convert__dag(self, dag: DAG | None) -> DAG | None: return dag if not isinstance(dag, DAG): - raise TypeError(f"Expected DAG; received {dag.__class__.__name__}") + raise TypeError(f"Expected Dag; received {dag.__class__.__name__}") if self._dag is not None and self._dag is not dag: - raise ValueError(f"The DAG assigned to {self} can not be changed.") + raise ValueError(f"The Dag assigned to {self} can not be changed.") if self.__from_mapped: - pass # Don't add to DAG -- the mapped task takes the place. + pass # Don't add to Dag -- the mapped task takes the place. elif dag.task_dict.get(self.task_id) is not self: dag.add_task(self) return dag @@ -1370,7 +1370,7 @@ def task_display_name(self) -> str: return self._task_display_name or self.task_id def has_dag(self): - """Return True if the Operator has been assigned to a DAG.""" + """Return True if the Operator has been assigned to a Dag.""" return self._dag is not None def _set_xcomargs_dependencies(self) -> None: @@ -1456,7 +1456,7 @@ def output(self) -> XComArg: @classmethod def get_serialized_fields(cls): - """Stringified DAGs and operators contain exactly these fields.""" + """Stringified Dags and operators contain exactly these fields.""" if not cls.__serialized_fields: from airflow.sdk.definitions._internal.contextmanager import DagContext diff --git a/task-sdk/src/airflow/sdk/bases/xcom.py b/task-sdk/src/airflow/sdk/bases/xcom.py index 693c38dc4aca5..5028aa95f285f 100644 --- a/task-sdk/src/airflow/sdk/bases/xcom.py +++ b/task-sdk/src/airflow/sdk/bases/xcom.py @@ -66,9 +66,9 @@ def set( :param key: Key to store the XCom. :param value: XCom value to store. - :param dag_id: DAG ID. + :param dag_id: Dag ID. :param task_id: Task ID. - :param run_id: DAG run ID for the task. + :param run_id: Dag run ID for the task. :param map_index: Optional map index to assign XCom for a mapped task. The default is ``-1`` (set for a non-mapped task). """ @@ -111,9 +111,9 @@ def _set_xcom_in_db( :param key: Key to store the XCom. :param value: XCom value to store. - :param dag_id: DAG ID. + :param dag_id: Dag ID. :param task_id: Task ID. - :param run_id: DAG run ID for the task. + :param run_id: Dag run ID for the task. :param map_index: Optional map index to assign XCom for a mapped task. The default is ``-1`` (set for a non-mapped task). """ @@ -180,8 +180,8 @@ def _get_xcom_db_ref( .. seealso:: ``get_value()`` is a convenience function if you already have a structured TaskInstance or TaskInstanceKey object available. - :param run_id: DAG run ID for the task. - :param dag_id: Only pull XCom from this DAG. Pass *None* (default) to + :param run_id: Dag run ID for the task. + :param dag_id: Only pull XCom from this Dag. Pass *None* (default) to remove the filter. :param task_id: Only XCom from task with matching ID will be pulled. Pass *None* (default) to remove the filter. @@ -230,8 +230,8 @@ def get_one( .. seealso:: ``get_value()`` is a convenience function if you already have a structured TaskInstance or TaskInstanceKey object available. - :param run_id: DAG run ID for the task. - :param dag_id: Only pull XCom from this DAG. Pass *None* (default) to + :param run_id: Dag run ID for the task. + :param dag_id: Only pull XCom from this Dag. Pass *None* (default) to remove the filter. :param task_id: Only XCom from task with matching ID will be pulled. Pass *None* (default) to remove the filter. @@ -240,7 +240,7 @@ def get_one( :param key: A key for the XCom. If provided, only XCom with matching keys will be returned. Pass *None* (default) to remove the filter. :param include_prior_dates: If *False* (default), only XCom from the - specified DAG run is returned. If *True*, the latest matching XCom is + specified Dag run is returned. If *True*, the latest matching XCom is returned regardless of the run it belongs to. """ from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS @@ -291,11 +291,11 @@ def get_all( indexes of a mapped task at once. :param key: A key for the XCom. Only XComs with this key will be returned. - :param run_id: DAG run ID for the task. - :param dag_id: DAG ID to pull XComs from. + :param run_id: Dag run ID for the task. + :param dag_id: Dag ID to pull XComs from. :param task_id: Task ID to pull XComs from. :param include_prior_dates: If *False* (default), only XComs from the - specified DAG run are returned. If *True*, the latest matching XComs are + specified Dag run are returned. If *True*, the latest matching XComs are returned regardless of the run they belong to. :return: List of all XCom values if found. """ diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py index 7629c06f7954c..2189c14ad7907 100644 --- a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py @@ -319,10 +319,10 @@ def _iter_all_mapped_downstreams(self) -> Iterator[MappedOperator | MappedTaskGr """ Return mapped nodes that are direct dependencies of the current task. - For now, this walks the entire DAG to find mapped nodes that has this + For now, this walks the entire Dag to find mapped nodes that has this current task as an upstream. We cannot use ``downstream_list`` since it only contains operators, not task groups. In the future, we should - provide a way to record an DAG node's all downstream nodes instead. + provide a way to record an Dag node's all downstream nodes instead. Note that this does not guarantee the returned tasks actually use the current task for task mapping, but only checks those task are mapped @@ -348,7 +348,7 @@ def _walk_group(group: TaskGroup) -> Iterable[tuple[str, DAGNode]]: dag = self.get_dag() if not dag: - raise RuntimeError("Cannot check for mapped dependants when not attached to a DAG") + raise RuntimeError("Cannot check for mapped dependants when not attached to a Dag") for key, child in _walk_group(dag.task_group): if key == self.node_id: continue @@ -361,10 +361,10 @@ def iter_mapped_dependants(self) -> Iterator[MappedOperator | MappedTaskGroup]: """ Return mapped nodes that depend on the current task the expansion. - For now, this walks the entire DAG to find mapped nodes that has this + For now, this walks the entire Dag to find mapped nodes that has this current task as an upstream. We cannot use ``downstream_list`` since it only contains operators, not task groups. In the future, we should - provide a way to record an DAG node's all downstream nodes instead. + provide a way to record an Dag node's all downstream nodes instead. """ return ( downstream @@ -386,7 +386,7 @@ def iter_mapped_task_groups(self) -> Iterator[MappedTaskGroup]: def get_closest_mapped_task_group(self) -> MappedTaskGroup | None: """ - Get the mapped task group "closest" to this task in the DAG. + Get the mapped task group "closest" to this task in the Dag. :meta private: """ @@ -408,7 +408,7 @@ def get_needs_expansion(self) -> bool: @methodtools.lru_cache(maxsize=None) def get_parse_time_mapped_ti_count(self) -> int: """ - Return the number of mapped task instances that can be created on DAG run creation. + Return the number of mapped task instances that can be created on Dag run creation. This only considers literal mapped arguments, and would return *None* when any non-literal values are used for mapping. diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/contextmanager.py b/task-sdk/src/airflow/sdk/definitions/_internal/contextmanager.py index 2914799a33a55..f0fbdc306e6f3 100644 --- a/task-sdk/src/airflow/sdk/definitions/_internal/contextmanager.py +++ b/task-sdk/src/airflow/sdk/definitions/_internal/contextmanager.py @@ -52,7 +52,7 @@ class ContextStackMeta(type): _context: deque # TODO: Task-SDK: - # share_parent_context can go away once the DAG and TaskContext manager in airflow.models are removed and + # share_parent_context can go away once the Dag and TaskContext manager in airflow.models are removed and # everything uses sdk fully for definition/parsing def __new__(cls, name, bases, namespace, share_parent_context: bool = False, **kwargs: Any): if not share_parent_context: @@ -89,9 +89,9 @@ def get_current(cls) -> T | None: class DagContext(ContextStack[DAG]): """ - DAG context is used to keep the current DAG when DAG is used as ContextManager. + Dag context is used to keep the current Dag when Dag is used as ContextManager. - You can use DAG as context: + You can use Dag as context: .. code-block:: python @@ -103,8 +103,8 @@ class DagContext(ContextStack[DAG]): ) as dag: ... - If you do this the context stores the DAG and whenever new task is created, it will use - such stored DAG as the parent DAG. + If you do this the context stores the Dag and whenever new task is created, it will use + such stored Dag as the parent Dag. """ diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/mixins.py b/task-sdk/src/airflow/sdk/definitions/_internal/mixins.py index e3ea4a169f07a..d14d629915987 100644 --- a/task-sdk/src/airflow/sdk/definitions/_internal/mixins.py +++ b/task-sdk/src/airflow/sdk/definitions/_internal/mixins.py @@ -131,7 +131,7 @@ def iter_references(self) -> Iterable[tuple[Operator, str]]: """ Find underlying XCom references this contains. - This is used by the DAG parser to recursively find task dependencies. + This is used by the Dag parser to recursively find task dependencies. :meta private: """ diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/node.py b/task-sdk/src/airflow/sdk/definitions/_internal/node.py index e6ba2d880a5bc..75958d047b421 100644 --- a/task-sdk/src/airflow/sdk/definitions/_internal/node.py +++ b/task-sdk/src/airflow/sdk/definitions/_internal/node.py @@ -177,27 +177,27 @@ def _set_relatives( ) task_list.append(task) - # relationships can only be set if the tasks share a single DAG. Tasks - # without a DAG are assigned to that DAG. + # relationships can only be set if the tasks share a single Dag. Tasks + # without a Dag are assigned to that Dag. dags: set[DAG] = {task.dag for task in [*self.roots, *task_list] if task.has_dag() and task.dag} if len(dags) > 1: - raise RuntimeError(f"Tried to set relationships between tasks in more than one DAG: {dags}") + raise RuntimeError(f"Tried to set relationships between tasks in more than one Dag: {dags}") if len(dags) == 1: dag = dags.pop() else: raise ValueError( - "Tried to create relationships between tasks that don't have DAGs yet. " - f"Set the DAG for at least one task and try again: {[self, *task_list]}" + "Tried to create relationships between tasks that don't have Dags yet. " + f"Set the Dag for at least one task and try again: {[self, *task_list]}" ) if not self.has_dag(): - # If this task does not yet have a dag, add it to the same dag as the other task. + # If this task does not yet have a Dag, add it to the same Dag as the other task. self.dag = dag for task in task_list: if dag and not task.has_dag(): - # If the other task does not yet have a dag, add it to the same dag as this task and + # If the other task does not yet have a Dag, add it to the same Dag as this task and dag.add_task(task) # type: ignore[arg-type] if upstream: task.downstream_task_ids.add(self.node_id) @@ -230,14 +230,14 @@ def set_upstream( def downstream_list(self) -> Iterable[Operator]: """List of nodes directly downstream.""" if not self.dag: - raise RuntimeError(f"Operator {self} has not been assigned to a DAG yet") + raise RuntimeError(f"Operator {self} has not been assigned to a Dag yet") return [self.dag.get_task(tid) for tid in self.downstream_task_ids] @property def upstream_list(self) -> Iterable[Operator]: """List of nodes directly upstream.""" if not self.dag: - raise RuntimeError(f"Operator {self} has not been assigned to a DAG yet") + raise RuntimeError(f"Operator {self} has not been assigned to a Dag yet") return [self.dag.get_task(tid) for tid in self.upstream_task_ids] def get_direct_relative_ids(self, upstream: bool = False) -> set[str]: diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/templater.py b/task-sdk/src/airflow/sdk/definitions/_internal/templater.py index 9d24de475c536..ea5c7ffaf06ed 100644 --- a/task-sdk/src/airflow/sdk/definitions/_internal/templater.py +++ b/task-sdk/src/airflow/sdk/definitions/_internal/templater.py @@ -70,7 +70,7 @@ class Templater: template_ext: Sequence[str] def get_template_env(self, dag: DAG | None = None) -> jinja2.Environment: - """Fetch a Jinja template environment from the DAG or instantiate empty environment if no DAG.""" + """Fetch a Jinja template environment from the Dag or instantiate empty environment if no Dag.""" # This is imported locally since Jinja2 is heavy and we don't need it # for most of the functionalities. It is imported by get_template_env() # though, so we don't need to put this after the 'if dag' check. diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py index d136d4ad9db5e..03e32297e01d8 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -287,9 +287,9 @@ class AssetWatcher: name: str # This attribute serves double purpose. - # For a "normal" asset instance loaded from DAG, this holds the trigger used to monitor an external + # For a "normal" asset instance loaded from Dag, this holds the trigger used to monitor an external # resource. In that case, ``AssetWatcher`` is used directly by users. - # For an asset recreated from a serialized DAG, this holds the serialized data of the trigger. In that + # For an asset recreated from a serialized Dag, this holds the serialized data of the trigger. In that # case, `SerializedAssetWatcher` is used. We need to keep the two types to make mypy happy because # `SerializedAssetWatcher` is a subclass of `AssetWatcher`. trigger: BaseEventTrigger | dict diff --git a/task-sdk/src/airflow/sdk/definitions/asset/decorators.py b/task-sdk/src/airflow/sdk/definitions/asset/decorators.py index daeb6204c3f05..245cc24d27384 100644 --- a/task-sdk/src/airflow/sdk/definitions/asset/decorators.py +++ b/task-sdk/src/airflow/sdk/definitions/asset/decorators.py @@ -168,7 +168,7 @@ def iter_outlets(self) -> Iterator[BaseAsset]: @attrs.define(kw_only=True) class _DAGFactory: """ - Common class for things that take DAG-like arguments. + Common class for things that take Dag-like arguments. This exists so we don't need to define these arguments separately for ``@asset`` and ``@asset.multi``. @@ -222,7 +222,7 @@ class asset(_DAGFactory): @attrs.define(kw_only=True) class multi(_DAGFactory): - """Create a one-task DAG that emits multiple assets.""" + """Create a one-task Dag that emits multiple assets.""" outlets: Collection[BaseAsset] # TODO: Support non-asset outlets? diff --git a/task-sdk/src/airflow/sdk/definitions/context.py b/task-sdk/src/airflow/sdk/definitions/context.py index 10d7facfa9179..46daccb6f5657 100644 --- a/task-sdk/src/airflow/sdk/definitions/context.py +++ b/task-sdk/src/airflow/sdk/definitions/context.py @@ -139,10 +139,10 @@ def my_task(): class AirflowParsingContext(NamedTuple): """ - Context of parsing for the DAG. + Context of parsing for the Dag. - If these values are not None, they will contain the specific DAG and Task ID that Airflow is requesting to - execute. You can use these for optimizing dynamically generated DAG files. + If these values are not None, they will contain the specific Dag and Task ID that Airflow is requesting to + execute. You can use these for optimizing dynamically generated Dag files. You can obtain the current values via :py:func:`.get_parsing_context`. """ @@ -156,7 +156,7 @@ class AirflowParsingContext(NamedTuple): def get_parsing_context() -> AirflowParsingContext: - """Return the current (DAG) parsing context info.""" + """Return the current (Dag) parsing context info.""" return AirflowParsingContext( dag_id=os.environ.get(_AIRFLOW_PARSING_CONTEXT_DAG_ID), task_id=os.environ.get(_AIRFLOW_PARSING_CONTEXT_TASK_ID), @@ -177,7 +177,7 @@ def render_template(template: Any, context: MutableMapping[str, Any], *, native: :param template: A Jinja2 template to render. :param context: The Airflow task context to render the template with. :param native: If set to *True*, render the template into a native type. A - DAG can enable this with ``render_template_as_native_obj=True``. + Dag can enable this with ``render_template_as_native_obj=True``. :returns: The render result. """ context = copy.copy(context) diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 2c8fe8518fbac..23c0138a4b9b3 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -257,8 +257,8 @@ class DAG: are met. Certain tasks have the property of depending on their own past, meaning that they can't run until their previous schedule (and upstream tasks) are completed. - DAGs essentially act as namespaces for tasks. A task_id can only be - added once to a DAG. + Dags essentially act as namespaces for tasks. A task_id can only be + added once to a Dag. Note that if you plan to use time zones all the dates provided should be pendulum dates. See :ref:`timezone_aware_dags`. @@ -348,15 +348,15 @@ class DAG: :param render_template_as_native_obj: If True, uses a Jinja ``NativeEnvironment`` to render templates as native Python types. If False, a Jinja ``Environment`` is used to render templates as string values. - :param tags: List of tags to help filtering DAGs in the UI. - :param owner_links: Dict of owners and their links, that will be clickable on the DAGs view UI. + :param tags: List of tags to help filtering Dags in the UI. + :param owner_links: Dict of owners and their links, that will be clickable on the Dags view UI. Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link. e.g: ``{"dag_owner": "https://airflow.apache.org/"}`` :param auto_register: Automatically register this DAG when it is used in a ``with`` block - :param fail_fast: Fails currently running tasks when task in DAG fails. + :param fail_fast: Fails currently running tasks when task in Dag fails. **Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success"). An exception will be thrown if any task in a fail stop dag has a non default trigger rule. - :param dag_display_name: The display name of the DAG which appears on the UI. + :param dag_display_name: The display name of the Dag which appears on the UI. """ __serialized_fields: ClassVar[frozenset[str]] @@ -478,7 +478,7 @@ def __attrs_post_init__(self): @params.validator def _validate_params(self, _, params: ParamsDict): """ - Validate Param values when the DAG has schedule defined. + Validate Param values when the Dag has schedule defined. Raise exception if there are any Params which can not be resolved by their schema definition. """ @@ -489,7 +489,7 @@ def _validate_params(self, _, params: ParamsDict): params.validate() except ParamValidationError as pverr: raise ValueError( - f"DAG {self.dag_id!r} is not allowed to define a Schedule, " + f"Dag {self.dag_id!r} is not allowed to define a Schedule, " "as there are required params without default values, or the default values are not valid." ) from pverr @@ -599,9 +599,9 @@ def __exit__(self, _type, _value, _tb): def validate(self): """ - Validate the DAG has a coherent setup. + Validate the Dag has a coherent setup. - This is called by the DAG bag before bagging the DAG. + This is called by the Dag bag before bagging the Dag. """ self.timetable.validate() self.validate_setup_teardown() @@ -658,15 +658,15 @@ def tasks_upstream_of_teardowns(self) -> list[Operator]: @property def folder(self) -> str: - """Folder location of where the DAG object is instantiated.""" + """Folder location of where the Dag object is instantiated.""" return os.path.dirname(self.fileloc) @property def owner(self) -> str: """ - Return list of all owners found in DAG tasks. + Return list of all owners found in Dag tasks. - :return: Comma separated list of owners in DAG tasks + :return: Comma separated list of owners in Dag tasks """ return ", ".join({t.owner for t in self.tasks}) @@ -714,7 +714,7 @@ def get_template_env(self, *, force_sandboxed: bool = False) -> jinja2.Environme return env def set_dependency(self, upstream_task_id, downstream_task_id): - """Set dependency between two tasks that already have been added to the DAG using add_task().""" + """Set dependency between two tasks that already have been added to the Dag using add_task().""" self.get_task(upstream_task_id).set_downstream(self.get_task(downstream_task_id)) @property @@ -920,7 +920,7 @@ def task(self) -> TaskDecoratorCollection: def add_task(self, task: Operator) -> None: """ - Add a task to the DAG. + Add a task to the Dag. :param task: the task you want to add """ @@ -928,11 +928,11 @@ def add_task(self, task: Operator) -> None: from airflow.sdk.definitions._internal.contextmanager import TaskGroupContext - # if the task has no start date, assign it the same as the DAG + # if the task has no start date, assign it the same as the Dag if not task.start_date: task.start_date = self.start_date # otherwise, the task will start on the later of its own start date and - # the DAG's start date + # the Dag's start date elif self.start_date: task.start_date = max(task.start_date, self.start_date) @@ -940,7 +940,7 @@ def add_task(self, task: Operator) -> None: if not task.end_date: task.end_date = self.end_date # otherwise, the task will end on the earlier of its own end date and - # the DAG's end date + # the Dag's end date elif task.end_date and self.end_date: task.end_date = min(task.end_date, self.end_date) @@ -965,7 +965,7 @@ def add_task(self, task: Operator) -> None: def add_tasks(self, tasks: Iterable[Operator]) -> None: """ - Add a list of tasks to the DAG. + Add a list of tasks to the Dag. :param tasks: a lit of tasks you want to add """ @@ -982,9 +982,9 @@ def _remove_task(self, task_id: str) -> None: def check_cycle(self) -> None: """ - Check to see if there are any cycles in the DAG. + Check to see if there are any cycles in the Dag. - :raises AirflowDagCycleException: If cycle is found in the DAG. + :raises AirflowDagCycleException: If cycle is found in the Dag. """ # default of int is 0 which corresponds to CYCLE_NEW CYCLE_NEW = 0 @@ -999,7 +999,7 @@ def _check_adjacent_tasks(task_id, current_task): """Return first untraversed child task, else None if all tasks traversed.""" for adjacent_task in current_task.get_direct_relative_ids(): if visited[adjacent_task] == CYCLE_IN_PROGRESS: - msg = f"Cycle detected in DAG: {self.dag_id}. Faulty task: {task_id}" + msg = f"Cycle detected in Dag: {self.dag_id}. Faulty task: {task_id}" raise AirflowDagCycleException(msg) if visited[adjacent_task] == CYCLE_NEW: return adjacent_task @@ -1022,7 +1022,7 @@ def _check_adjacent_tasks(task_id, current_task): path_stack.append(child_to_check) def cli(self): - """Exposes a CLI specific to this DAG.""" + """Exposes a CLI specific to this Dag.""" self.check_cycle() from airflow.cli import cli_parser @@ -1033,12 +1033,12 @@ def cli(self): @classmethod def get_serialized_fields(cls): - """Stringified DAGs and operators contain exactly these fields.""" + """Stringified Dags and operators contain exactly these fields.""" return cls.__serialized_fields def get_edge_info(self, upstream_task_id: str, downstream_task_id: str) -> EdgeInfoType: """Return edge information for the given pair of tasks or an empty edge if there is no information.""" - # Note - older serialized DAGs may not have edge_info being a dict at all + # Note - older serialized Dags may not have edge_info being a dict at all empty = cast("EdgeInfoType", {}) if self.edge_info: return self.edge_info.get(upstream_task_id, {}).get(downstream_task_id, empty) @@ -1046,7 +1046,7 @@ def get_edge_info(self, upstream_task_id: str, downstream_task_id: str) -> EdgeI def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: EdgeInfoType): """ - Set the given edge information on the DAG. + Set the given edge information on the Dag. Note that this will overwrite, rather than merge with, existing info. """ @@ -1081,14 +1081,14 @@ def test( mark_success_pattern: Pattern | str | None = None, ): """ - Execute one single DagRun for a given DAG and logical date. + Execute one single DagRun for a given Dag and logical date. :param run_after: the datetime before which to Dag cannot run. - :param logical_date: logical date for the DAG run + :param logical_date: logical date for the Dag run :param run_conf: configuration to pass to newly created dagrun :param conn_file_path: file path to a connection file in either yaml or json :param variable_file_path: file path to a variable file in either yaml or json - :param use_executor: if set, uses an executor to test the DAG + :param use_executor: if set, uses an executor to test the Dag :param mark_success_pattern: regex of task_ids to mark as success instead of running """ import re @@ -1161,9 +1161,9 @@ def add_logger_if_needed(ti: TaskInstance): self.timetable.infer_manual_data_interval(run_after=logical_date) if logical_date else None ) scheduler_dag = SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self)) # type: ignore[arg-type] - # Preserve callback functions from original DAG since they're lost during serialization + # Preserve callback functions from original Dag since they're lost during serialization # and yes it is a hack for now! It is a tradeoff for code simplicity. - # Without it, we need "Scheduler DAG" (Serialized dag) for the scheduler bits + # Without it, we need "Scheduler Dag" (Serialized dag) for the scheduler bits # -- dep check, scheduling tis # and need real dag to get and run callbacks without having to load the dag model @@ -1423,9 +1423,9 @@ def dag( disable_bundle_versioning: bool = False, ) -> Callable[[Callable], Callable[..., DAG]]: """ - Python dag decorator which wraps a function into an Airflow DAG. + Python dag decorator which wraps a function into an Airflow Dag. - Accepts kwargs for operator kwarg. Can be used to parameterize DAGs. + Accepts kwargs for operator kwarg. Can be used to parameterize Dags. :param dag_args: Arguments for DAG object :param dag_kwargs: Kwargs for DAG object. @@ -1462,9 +1462,9 @@ def factory(*args, **kwargs): # Apply defaults to capture default values if set. f_sig.apply_defaults() - # Initialize DAG with bound arguments + # Initialize Dag with bound arguments with DAG(dag_id, **decorator_kwargs) as dag_obj: - # Set DAG documentation from function documentation if it exists and doc_md is not set. + # Set Dag documentation from function documentation if it exists and doc_md is not set. if f.__doc__ and not dag_obj.doc_md: dag_obj.doc_md = f.__doc__ @@ -1478,7 +1478,7 @@ def factory(*args, **kwargs): back = sys._getframe().f_back dag_obj.fileloc = back.f_code.co_filename if back else "" - # Invoke function to create operators in the DAG scope. + # Invoke function to create operators in the Dag scope. f(**f_kwargs) # Return dag object such that it's accessible in Globals. diff --git a/task-sdk/src/airflow/sdk/definitions/decorators/setup_teardown.py b/task-sdk/src/airflow/sdk/definitions/decorators/setup_teardown.py index 24c5483e155e6..e8059956e670d 100644 --- a/task-sdk/src/airflow/sdk/definitions/decorators/setup_teardown.py +++ b/task-sdk/src/airflow/sdk/definitions/decorators/setup_teardown.py @@ -39,7 +39,7 @@ def setup_task(func: Callable) -> Callable: """ Decorate a function to mark it as a setup task. - A setup task runs before all other tasks in its DAG or TaskGroup context + A setup task runs before all other tasks in its Dag or TaskGroup context and can perform initialization or resource preparation. Example:: @@ -62,8 +62,8 @@ def teardown_task(_func=None, *, on_failure_fail_dagrun: bool = False) -> Callab """ Decorate a function to mark it as a teardown task. - A teardown task runs after all main tasks in its DAG or TaskGroup context. - If ``on_failure_fail_dagrun=True``, a failure in teardown will mark the DAG run as failed. + A teardown task runs after all main tasks in its Dag or TaskGroup context. + If ``on_failure_fail_dagrun=True``, a failure in teardown will mark the Dag run as failed. Example:: diff --git a/task-sdk/src/airflow/sdk/definitions/decorators/task_group.py b/task-sdk/src/airflow/sdk/definitions/decorators/task_group.py index a79dd04b38b2f..44baff52c1d27 100644 --- a/task-sdk/src/airflow/sdk/definitions/decorators/task_group.py +++ b/task-sdk/src/airflow/sdk/definitions/decorators/task_group.py @@ -19,8 +19,8 @@ Implements the ``@task_group`` function decorator. When the decorated function is called, a task group will be created to represent -a collection of closely related tasks on the same DAG that should be grouped -together when the DAG is displayed graphically. +a collection of closely related tasks on the same Dag that should be grouped +together when the Dag is displayed graphically. """ from __future__ import annotations diff --git a/task-sdk/src/airflow/sdk/definitions/edges.py b/task-sdk/src/airflow/sdk/definitions/edges.py index 39fafc4b932c8..93f4dc9874e3a 100644 --- a/task-sdk/src/airflow/sdk/definitions/edges.py +++ b/task-sdk/src/airflow/sdk/definitions/edges.py @@ -175,7 +175,7 @@ def update_relative( def add_edge_info(self, dag: DAG, upstream_id: str, downstream_id: str): """ - Add or update task info on the DAG for this specific pair of tasks. + Add or update task info on the Dag for this specific pair of tasks. Called either from our relationship trigger methods above, or directly by set_upstream/set_downstream in operators. @@ -190,6 +190,6 @@ def Label(label: str): class EdgeInfoType(TypedDict): - """Extra metadata that the DAG can store about an edge, usually generated from an EdgeModifier.""" + """Extra metadata that the Dag can store about an edge, usually generated from an EdgeModifier.""" label: str | None diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py index 8cdd879eece91..09ae0420e5747 100644 --- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -167,9 +167,9 @@ class OperatorPartial: """ An "intermediate state" returned by ``BaseOperator.partial()``. - This only exists at DAG-parsing time; the only intended usage is for the + This only exists at Dag-parsing time; the only intended usage is for the user to call ``.expand()`` on it at some point (usually in a method chain) to - create a ``MappedOperator`` to add into the DAG. + create a ``MappedOperator`` to add into the Dag. """ operator_class: type[BaseOperator] @@ -269,16 +269,16 @@ def _expand(self, expand_input: ExpandInput, *, strict: bool) -> MappedOperator: @attrs.define( kw_only=True, # Disable custom __getstate__ and __setstate__ generation since it interacts - # badly with Airflow's DAG serialization and pickling. When a mapped task is + # badly with Airflow's Dag serialization and pickling. When a mapped task is # deserialized, subclasses are coerced into MappedOperator, but when it goes - # through DAG pickling, all attributes defined in the subclasses are dropped + # through Dag pickling, all attributes defined in the subclasses are dropped # by attrs's custom state management. Since attrs does not do anything too # special here (the logic is only important for slots=True), we use Python's # built-in implementation, which works (as proven by good old BaseOperator). getstate_setstate=False, ) class MappedOperator(AbstractOperator): - """Object representing a mapped operator in a DAG.""" + """Object representing a mapped operator in a Dag.""" operator_class: type[BaseOperator] diff --git a/task-sdk/src/airflow/sdk/definitions/param.py b/task-sdk/src/airflow/sdk/definitions/param.py index 94704cb42a0c8..6c61fa88a70a0 100644 --- a/task-sdk/src/airflow/sdk/definitions/param.py +++ b/task-sdk/src/airflow/sdk/definitions/param.py @@ -254,14 +254,14 @@ def deserialize(data: dict, version: int) -> ParamsDict: class DagParam(ResolveMixin): """ - DAG run parameter reference. + Dag run parameter reference. - This binds a simple Param object to a name within a DAG instance, so that it + This binds a simple Param object to a name within a Dag instance, so that it can be resolved during the runtime via the ``{{ context }}`` dictionary. The ideal use case of this class is to implicitly convert args passed to a method decorated by ``@dag``. - It can be used to parameterize a DAG. You can overwrite its value by setting + It can be used to parameterize a Dag. You can overwrite its value by setting it on conf when you trigger your DagRun. This can also be used in templates by accessing ``{{ context.params }}``. @@ -311,13 +311,13 @@ def deserialize(cls, data: dict, dags: dict) -> DagParam: Deserializes the dictionary back into a DagParam object. :param data: The serialized representation of the DagParam. - :param dags: A dictionary of available DAGs to look up the DAG. + :param dags: A dictionary of available Dags to look up the Dag. """ dag_id = data["dag_id"] - # Retrieve the current DAG from the provided DAGs dictionary + # Retrieve the current Dag from the provided Dags dictionary current_dag = dags.get(dag_id) if not current_dag: - raise ValueError(f"DAG with id {dag_id} not found.") + raise ValueError(f"Dag with id {dag_id} not found.") return cls(current_dag=current_dag, name=data["name"], default=data["default"]) diff --git a/task-sdk/src/airflow/sdk/definitions/taskgroup.py b/task-sdk/src/airflow/sdk/definitions/taskgroup.py index 29da84bb8913c..45de93b7da81e 100644 --- a/task-sdk/src/airflow/sdk/definitions/taskgroup.py +++ b/task-sdk/src/airflow/sdk/definitions/taskgroup.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""A collection of closely related tasks on the same DAG that should be grouped together visually.""" +"""A collection of closely related tasks on the same Dag that should be grouped together visually.""" from __future__ import annotations @@ -90,17 +90,17 @@ class TaskGroup(DAGNode): all tasks within the group if necessary. :param group_id: a unique, meaningful id for the TaskGroup. group_id must not conflict - with group_id of TaskGroup or task_id of tasks in the DAG. Root TaskGroup has group_id + with group_id of TaskGroup or task_id of tasks in the Dag. Root TaskGroup has group_id set to None. :param prefix_group_id: If set to True, child task_id and group_id will be prefixed with this TaskGroup's group_id. If set to False, child task_id and group_id are not prefixed. Default is True. :param parent_group: The parent TaskGroup of this TaskGroup. parent_group is set to None for the root TaskGroup. - :param dag: The DAG that this TaskGroup belongs to. + :param dag: The Dag that this TaskGroup belongs to. :param default_args: A dictionary of default parameters to be used as constructor keyword parameters when initialising operators, - will override default_args defined in the DAG level. + will override default_args defined in the Dag level. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains `'depends_on_past': True` here and `'depends_on_past': False` in the operator's call @@ -245,13 +245,13 @@ def add(self, task: DAGNode) -> DAGNode: if key in self.children: node_type = "Task" if hasattr(task, "task_id") else "Task Group" - raise DuplicateTaskIdFound(f"{node_type} id '{key}' has already been added to the DAG") + raise DuplicateTaskIdFound(f"{node_type} id '{key}' has already been added to the Dag") if isinstance(task, TaskGroup): if self.dag: if task.dag is not None and self.dag is not task.dag: raise RuntimeError( - "Cannot mix TaskGroups from different DAGs: %s and %s", + "Cannot mix TaskGroups from different Dags: %s and %s", self.dag, task.dag, ) @@ -495,7 +495,7 @@ def get_child_by_label(self, label: str) -> DAGNode: return self.children[self.child_id(label)] def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]: - """Serialize task group; required by DAGNode.""" + """Serialize task group; required by DagNode.""" from airflow.serialization.enums import DagAttributeTypes from airflow.serialization.serialized_objects import TaskGroupSerialization @@ -641,7 +641,7 @@ def __iter__(self): @methodtools.lru_cache(maxsize=None) def get_parse_time_mapped_ti_count(self) -> int: """ - Return the Number of instances a task in this group should be mapped to, when a DAG run is created. + Return the Number of instances a task in this group should be mapped to, when a Dag run is created. This only considers literal mapped arguments, and would return *None* when any non-literal values are used for mapping. diff --git a/task-sdk/src/airflow/sdk/definitions/xcom_arg.py b/task-sdk/src/airflow/sdk/definitions/xcom_arg.py index e9bf54e0b7457..fa540333cf317 100644 --- a/task-sdk/src/airflow/sdk/definitions/xcom_arg.py +++ b/task-sdk/src/airflow/sdk/definitions/xcom_arg.py @@ -226,11 +226,11 @@ def __iter__(self): The default ``__iter__`` implementation in Python calls ``__getitem__`` with 0, 1, 2, etc. until it hits an ``IndexError``. This does not work well with our custom ``__getitem__`` implementation, and results in poor - DAG-writing experience since a misplaced ``*`` expansion would create an - infinite loop consuming the entire DAG parser. + Dag-writing experience since a misplaced ``*`` expansion would create an + infinite loop consuming the entire Dag parser. This override catches the error eagerly, so an incorrectly implemented - DAG fails fast and avoids wasting resources on nonsensical iterating. + Dag fails fast and avoids wasting resources on nonsensical iterating. """ raise TypeError("'XComArg' object is not iterable") @@ -566,7 +566,7 @@ def resolve(self, context: Mapping[str, Any]) -> Any: def serialize_xcom_arg(value: XComArg) -> dict[str, Any]: - """DAG serialization interface.""" + """Dag serialization interface.""" key = next(k for k, v in _XCOM_ARG_TYPES.items() if isinstance(value, v)) if key: return {"type": key, **value._serialize()} diff --git a/task-sdk/src/airflow/sdk/exceptions.py b/task-sdk/src/airflow/sdk/exceptions.py index 716ae6cb75a94..96df62184644b 100644 --- a/task-sdk/src/airflow/sdk/exceptions.py +++ b/task-sdk/src/airflow/sdk/exceptions.py @@ -28,7 +28,7 @@ class AirflowDagCycleException(AirflowException): - """Raise when there is a cycle in DAG definition.""" + """Raise when there is a cycle in Dag definition.""" class AirflowRuntimeError(Exception): diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py b/task-sdk/src/airflow/sdk/execution_time/comms.py index 27e4b0a754e65..bfa9fb012ae17 100644 --- a/task-sdk/src/airflow/sdk/execution_time/comms.py +++ b/task-sdk/src/airflow/sdk/execution_time/comms.py @@ -501,7 +501,7 @@ def from_api_response(cls, dr_state_response: DagRunStateResponse) -> DagRunStat class PreviousDagRunResult(BaseModel): - """Response containing previous DAG run information.""" + """Response containing previous Dag run information.""" dag_run: DagRun | None = None type: Literal["PreviousDagRunResult"] = "PreviousDagRunResult" @@ -551,7 +551,7 @@ def from_api_response(cls, task_states_response: TaskStatesResponse) -> TaskStat class DRCount(BaseModel): - """Response containing count of DAG Runs matching certain filters.""" + """Response containing count of Dag Runs matching certain filters.""" count: int type: Literal["DRCount"] = "DRCount" diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 94b1b5270be54..440443b8c9c36 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -294,7 +294,7 @@ def block_orm_access(): delattr(settings, attr) def configure_orm(*args, **kwargs): - raise RuntimeError("Database access is disabled from DAGs and Triggers") + raise RuntimeError("Database access is disabled from Dags and Triggers") settings.configure_orm = configure_orm settings.Session = BlockedDBSession @@ -1454,7 +1454,7 @@ def start( # type: ignore[override] This bypasses the standard `ActivitySubprocess.start()` behavior, which expects to launch a subprocess and communicate via stdin/stdout. Instead, it constructs the `RuntimeTaskInstance` directly — useful in contexts like `dag.test()` where the - DAG is already parsed in memory. + Dag is already parsed in memory. Supervisor state and communications are simulated in-memory via `InProcessSupervisorComms`. """ @@ -1475,10 +1475,10 @@ def start( # type: ignore[override] supervisor.ti = what # type: ignore[assignment] # We avoid calling `task_runner.startup()` because we are already inside a - # parsed DAG file (e.g. via dag.test()). - # In normal execution, `startup()` parses the DAG based on info in a `StartupDetails` message. + # parsed Dag file (e.g. via dag.test()). + # In normal execution, `startup()` parses the Dag based on info in a `StartupDetails` message. # By directly constructing the `RuntimeTaskInstance`, - # we skip re-parsing (`task_runner.parse()`) and avoid needing to set DAG Bundle config + # we skip re-parsing (`task_runner.parse()`) and avoid needing to set Dag Bundle config # and run the task in-process. start_date = datetime.now(tz=timezone.utc) ti_context = supervisor.client.task_instances.start(supervisor.id, supervisor.pid, start_date) @@ -1512,7 +1512,7 @@ def _api_client(dag=None): from airflow.models.dagbag import DBDagBag # This is needed since the Execution API server uses the DBDagBag in its "state". - # This `app.state.dag_bag` is used to get some DAG properties like `fail_fast`. + # This `app.state.dag_bag` is used to get some Dag properties like `fail_fast`. dag_bag = DBDagBag() api.app.dependency_overrides[dag_bag_from_app] = lambda: dag_bag @@ -1768,8 +1768,8 @@ def supervise( Run a single task execution to completion. :param ti: The task instance to run. - :param bundle_info: Information of the DAG bundle to use for this task instance. - :param dag_rel_path: The file path to the DAG. + :param bundle_info: Information of the Dag bundle to use for this task instance. + :param dag_rel_path: The file path to the Dag. :param token: Authentication token for the API client. :param server: Base URL of the API server. :param dry_run: If True, execute without actual task execution (simulate run). diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 30ae5545c30de..d694d4f2b8031 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -310,8 +310,8 @@ def xcom_pull( manually). To remove the filter, pass *None*. :param task_ids: Only XComs from tasks with matching ids will be pulled. Pass *None* to remove the filter. - :param dag_id: If provided, only pulls XComs from this DAG. If *None* - (default), the DAG of the calling task is used. + :param dag_id: If provided, only pulls XComs from this Dag. If *None* + (default), the Dag of the calling task is used. :param map_indexes: If provided, only pull XComs with matching indexes. If *None* (default), this is inferred from the task(s) being pulled (see below for details). @@ -443,13 +443,13 @@ def get_first_reschedule_date(self, context: Context) -> AwareDatetime | None: return response.start_date def get_previous_dagrun(self, state: str | None = None) -> DagRun | None: - """Return the previous DAG run before the given logical date, optionally filtered by state.""" + """Return the previous Dag run before the given logical date, optionally filtered by state.""" context = self.get_template_context() dag_run = context.get("dag_run") log = structlog.get_logger(logger_name="task") - log.debug("Getting previous DAG run", dag_run=dag_run) + log.debug("Getting previous Dag run", dag_run=dag_run) if dag_run is None: return None @@ -527,7 +527,7 @@ def get_dr_count( run_ids: list[str] | None = None, states: list[str] | None = None, ) -> int: - """Return the number of DAG runs matching the given criteria.""" + """Return the number of Dag runs matching the given criteria.""" response = SUPERVISOR_COMMS.send( GetDRCount( dag_id=dag_id, @@ -544,7 +544,7 @@ def get_dr_count( @staticmethod def get_dagrun_state(dag_id: str, run_id: str) -> str: - """Return the state of the DAG run with the given Run ID.""" + """Return the state of the Dag run with the given Run ID.""" response = SUPERVISOR_COMMS.send(msg=GetDagRunState(dag_id=dag_id, run_id=run_id)) if TYPE_CHECKING: @@ -630,7 +630,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: dag = bag.dags[what.ti.dag_id] except KeyError: log.error( - "DAG not found during start up", dag_id=what.ti.dag_id, bundle=bundle_info, path=what.dag_rel_path + "Dag not found during start up", dag_id=what.ti.dag_id, bundle=bundle_info, path=what.dag_rel_path ) exit(1) @@ -640,7 +640,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: task = dag.task_dict[what.ti.task_id] except KeyError: log.error( - "Task not found in DAG during start up", + "Task not found in Dag during start up", dag_id=dag.dag_id, task_id=what.ti.task_id, bundle=bundle_info, @@ -729,7 +729,7 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id): ti = parse(msg, log) ti.log_url = get_log_url_from_ti(ti) - log.debug("DAG file parsed", file=msg.dag_rel_path) + log.debug("Dag file parsed", file=msg.dag_rel_path) run_as_user = getattr(ti.task, "run_as_user", None) or conf.get( "core", "default_impersonation", fallback=None @@ -1201,7 +1201,7 @@ def _get_email_subject_content( "max_tries": task_instance.max_tries, } - # Use the DAG's get_template_env() to set force_sandboxed. Don't add + # Use the Dag's get_template_env() to set force_sandboxed. Don't add # the flag to the function on task object -- that function can be # overridden, and adding a flag breaks backward compatibility. dag = task_instance.task.get_dag() @@ -1310,7 +1310,7 @@ def _execute_task(context: Context, ti: RuntimeTaskInstance, log: Logger): def _render_map_index(context: Context, ti: RuntimeTaskInstance, log: Logger) -> str | None: - """Render named map index if the DAG author defined map_index_template at the task level.""" + """Render named map index if the Dag author defined map_index_template at the task level.""" if (template := context.get("map_index_template")) is None: return None jinja_env = ti.task.dag.get_template_env() diff --git a/task-sdk/src/airflow/sdk/types.py b/task-sdk/src/airflow/sdk/types.py index 3238dfa03321a..247e1b8f3f088 100644 --- a/task-sdk/src/airflow/sdk/types.py +++ b/task-sdk/src/airflow/sdk/types.py @@ -38,7 +38,7 @@ class DagRunProtocol(Protocol): - """Minimal interface for a DAG run available during the execution.""" + """Minimal interface for a Dag run available during the execution.""" dag_id: str run_id: str diff --git a/task-sdk/tests/task_sdk/api/test_client.py b/task-sdk/tests/task_sdk/api/test_client.py index 5208980caebc8..966ac729d0aff 100644 --- a/task-sdk/tests/task_sdk/api/test_client.py +++ b/task-sdk/tests/task_sdk/api/test_client.py @@ -1029,7 +1029,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: json={ "detail": { "reason": "already_exists", - "message": "A DAG Run already exists for DAG test_trigger_conflict with run id test_run_id", + "message": "A Dag Run already exists for Dag test_trigger_conflict with run id test_run_id", } }, ) @@ -1050,7 +1050,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: json={ "detail": { "reason": "already_exists", - "message": "A DAG Run already exists for DAG test_trigger_conflict with run id test_run_id", + "message": "A Dag Run already exists for Dag test_trigger_conflict with run id test_run_id", } }, ) @@ -1212,13 +1212,13 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert result.dag_run.state == "success" def test_get_previous_not_found(self): - """Test get_previous when no previous DAG run exists returns None.""" + """Test get_previous when no previous Dag run exists returns None.""" logical_date = datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc) def handle_request(request: httpx.Request) -> httpx.Response: if request.url.path == "/dag-runs/test_dag/previous": assert request.url.params["logical_date"] == logical_date.isoformat() - # Return None (null) when no previous DAG run found + # Return None (null) when no previous Dag run found return httpx.Response(status_code=200, content="null") return httpx.Response(status_code=422) diff --git a/task-sdk/tests/task_sdk/bases/test_operator.py b/task-sdk/tests/task_sdk/bases/test_operator.py index bceccbc186ebf..28dd11a91abdb 100644 --- a/task-sdk/tests/task_sdk/bases/test_operator.py +++ b/task-sdk/tests/task_sdk/bases/test_operator.py @@ -760,7 +760,7 @@ def test_jinja_env_creation(self, mock_jinja_env): assert mock_jinja_env.call_count == 1 def test_deepcopy(self): - # Test bug when copying an operator attached to a DAG + # Test bug when copying an operator attached to a Dag with DAG("dag0", schedule=None, start_date=DEFAULT_DATE) as dag: @dag.task diff --git a/task-sdk/tests/task_sdk/dags/dag_parsing_context.py b/task-sdk/tests/task_sdk/dags/dag_parsing_context.py index f7a9dd42290a6..46146016e6a1b 100644 --- a/task-sdk/tests/task_sdk/dags/dag_parsing_context.py +++ b/task-sdk/tests/task_sdk/dags/dag_parsing_context.py @@ -32,5 +32,5 @@ BaseOperator(task_id="visible_task") if current_dag_id == DAG_ID: - # this task will be invisible if the DAG ID is not properly set in the parsing context. + # this task will be invisible if the Dag ID is not properly set in the parsing context. BaseOperator(task_id="conditional_task") diff --git a/task-sdk/tests/task_sdk/definitions/_internal/test_templater.py b/task-sdk/tests/task_sdk/definitions/_internal/test_templater.py index b396b596df0ef..b097e32a4c409 100644 --- a/task-sdk/tests/task_sdk/definitions/_internal/test_templater.py +++ b/task-sdk/tests/task_sdk/definitions/_internal/test_templater.py @@ -28,14 +28,14 @@ class TestTemplater: def test_get_template_env(self): - # Test get_template_env when a DAG is provided + # Test get_template_env when a Dag is provided templater = Templater() dag = DAG(dag_id="test_dag", schedule=None, render_template_as_native_obj=True) env = templater.get_template_env(dag) assert isinstance(env, jinja2.Environment) assert not env.sandboxed - # Test get_template_env when no DAG is provided + # Test get_template_env when no Dag is provided templater = Templater() env = templater.get_template_env() assert isinstance(env, jinja2.Environment) diff --git a/task-sdk/tests/task_sdk/definitions/test_asset.py b/task-sdk/tests/task_sdk/definitions/test_asset.py index b21909df03870..fd9b6d24bcca6 100644 --- a/task-sdk/tests/task_sdk/definitions/test_asset.py +++ b/task-sdk/tests/task_sdk/definitions/test_asset.py @@ -246,14 +246,14 @@ def create_test_assets(): def test_asset_trigger_setup_and_serialization(create_test_assets): assets = create_test_assets - # Create DAG with asset triggers + # Create Dag with asset triggers with DAG(dag_id="test", schedule=AssetAny(*assets), catchup=False) as dag: EmptyOperator(task_id="hello") # Verify assets are set up correctly - assert isinstance(dag.timetable.asset_condition, AssetAny), "DAG assets should be an instance of AssetAny" + assert isinstance(dag.timetable.asset_condition, AssetAny), "Dag assets should be an instance of AssetAny" - # Round-trip the DAG through serialization + # Round-trip the Dag through serialization deserialized_dag = SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(dag)) # Verify serialization and deserialization integrity diff --git a/task-sdk/tests/task_sdk/definitions/test_dag.py b/task-sdk/tests/task_sdk/definitions/test_dag.py index d2c7f6e73cc3f..14fa059d3d1f7 100644 --- a/task-sdk/tests/task_sdk/definitions/test_dag.py +++ b/task-sdk/tests/task_sdk/definitions/test_dag.py @@ -211,7 +211,7 @@ def test_not_none_schedule_with_non_default_params(self): DAG("my-dag", schedule=timedelta(days=1), start_date=DEFAULT_DATE, params=params) def test_roots(self): - """Verify if dag.roots returns the root tasks of a DAG.""" + """Verify if dag.roots returns the root tasks of a Dag.""" with DAG("test_dag", schedule=None, start_date=DEFAULT_DATE) as dag: op1 = BaseOperator(task_id="t1") op2 = BaseOperator(task_id="t2") @@ -223,7 +223,7 @@ def test_roots(self): assert set(dag.roots) == {op1, op2} def test_leaves(self): - """Verify if dag.leaves returns the leaf tasks of a DAG.""" + """Verify if dag.leaves returns the leaf tasks of a Dag.""" with DAG("test_dag", schedule=None, start_date=DEFAULT_DATE) as dag: op1 = BaseOperator(task_id="t1") op2 = BaseOperator(task_id="t2") @@ -238,7 +238,7 @@ def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self): """Verify tasks with Duplicate task_id raises error""" with DAG("test_dag", schedule=None, start_date=DEFAULT_DATE) as dag: op1 = BaseOperator(task_id="t1") - with pytest.raises(DuplicateTaskIdFound, match="Task id 't1' has already been added to the DAG"): + with pytest.raises(DuplicateTaskIdFound, match="Task id 't1' has already been added to the Dag"): BaseOperator(task_id="t1") assert dag.task_dict == {op1.task_id: op1} @@ -247,7 +247,7 @@ def test_duplicate_task_ids_not_allowed_without_dag_context_manager(self): """Verify tasks with Duplicate task_id raises error""" dag = DAG("test_dag", schedule=None, start_date=DEFAULT_DATE) op1 = BaseOperator(task_id="t1", dag=dag) - with pytest.raises(DuplicateTaskIdFound, match="Task id 't1' has already been added to the DAG"): + with pytest.raises(DuplicateTaskIdFound, match="Task id 't1' has already been added to the Dag"): BaseOperator(task_id="t1", dag=dag) assert dag.task_dict == {op1.task_id: op1} @@ -455,7 +455,7 @@ def test__tags_mutable(): def test_create_dag_while_active_context(): - """Test that we can safely create a DAG whilst a DAG is activated via ``with dag1:``.""" + """Test that we can safely create a Dag whilst a Dag is activated via ``with dag1:``.""" with DAG(dag_id="simple_dag"): DAG(dag_id="dag2") # No asserts needed, it just needs to not fail @@ -513,15 +513,15 @@ def noop_pipeline(): ... argnames=["dag_doc_md", "expected_doc_md"], argvalues=[ pytest.param("dag docs.", "dag docs.", id="use_dag_doc_md"), - pytest.param(None, "Regular DAG documentation", id="use_dag_docstring"), + pytest.param(None, "Regular Dag documentation", id="use_dag_docstring"), ], ) def test_documentation_added(self, dag_doc_md, expected_doc_md): - """Test that @dag uses function docs as doc_md for DAG object if doc_md is not explicitly set.""" + """Test that @dag uses function docs as doc_md for Dag object if doc_md is not explicitly set.""" @dag_decorator(schedule=None, default_args=self.DEFAULT_ARGS, doc_md=dag_doc_md) def noop_pipeline(): - """Regular DAG documentation""" + """Regular Dag documentation""" dag = noop_pipeline() assert isinstance(dag, DAG) @@ -539,26 +539,26 @@ def noop_pipeline(value): ... noop_pipeline() def test_documentation_template_rendered(self): - """Test that @dag uses function docs as doc_md for DAG object""" + """Test that @dag uses function docs as doc_md for Dag object""" @dag_decorator(schedule=None, default_args=self.DEFAULT_ARGS) def noop_pipeline(): """ {% if True %} - Regular DAG documentation + Regular Dag documentation {% endif %} """ dag = noop_pipeline() assert dag.dag_id == "noop_pipeline" - assert "Regular DAG documentation" in dag.doc_md + assert "Regular Dag documentation" in dag.doc_md def test_resolve_documentation_template_file_not_rendered(self, tmp_path): - """Test that @dag uses function docs as doc_md for DAG object""" + """Test that @dag uses function docs as doc_md for Dag object""" raw_content = """ {% if True %} - External Markdown DAG documentation + External Markdown Dag documentation {% endif %} """ @@ -595,7 +595,7 @@ def return_num(num): class DoNothingOperator(BaseOperator): """ An operator that does nothing. - Used to test DAG cycle detection. + Used to test Dag cycle detection. """ def execute(self, context: Context) -> None: diff --git a/task-sdk/tests/task_sdk/definitions/test_taskgroup.py b/task-sdk/tests/task_sdk/definitions/test_taskgroup.py index 1ee67d4c92e09..f94bfe88b5747 100644 --- a/task-sdk/tests/task_sdk/definitions/test_taskgroup.py +++ b/task-sdk/tests/task_sdk/definitions/test_taskgroup.py @@ -854,7 +854,7 @@ def section_1(): def test_build_task_group_with_operators(): - """Tests DAG with Tasks created with *Operators and TaskGroup created with taskgroup decorator""" + """Tests Dag with Tasks created with *Operators and TaskGroup created with taskgroup decorator""" from airflow.sdk import task def task_start(): @@ -894,7 +894,7 @@ def section_a(value): t_end = PythonOperator(task_id="task_end", python_callable=task_end, dag=dag) sec_1.set_downstream(t_end) - # Testing Tasks in DAG + # Testing Tasks in Dag assert set(dag.task_group.children.keys()) == {"section_1", "task_start", "task_end"} assert set(dag.task_group.children["section_1"].children.keys()) == { "section_1.task_2", diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index d8168037bea21..841137c07b3c8 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -501,7 +501,7 @@ def test_run_task_timeout(time_machine, create_runtime_ti, mock_supervisor_comms def test_basic_templated_dag(mocked_parse, make_ti_context, mock_supervisor_comms, spy_agency): - """Test running a DAG with templated task.""" + """Test running a Dag with templated task.""" from airflow.providers.standard.operators.bash import BashOperator task = BashOperator( @@ -606,7 +606,7 @@ def test_basic_templated_dag(mocked_parse, make_ti_context, mock_supervisor_comm def test_startup_and_run_dag_with_rtif( mocked_parse, task_params, expected_rendered_fields, make_ti_context, time_machine, mock_supervisor_comms ): - """Test startup of a DAG with various rendered templated fields.""" + """Test startup of a Dag with various rendered templated fields.""" class CustomOperator(BaseOperator): template_fields = tuple(task_params.keys()) @@ -801,7 +801,7 @@ def execute(self, context): def test_startup_and_run_dag_with_templated_fields( command, rendered_command, create_runtime_ti, time_machine ): - """Test startup of a DAG with various templated fields.""" + """Test startup of a Dag with various templated fields.""" from airflow.providers.standard.operators.bash import BashOperator task = BashOperator(task_id="templated_task", bash_command=command) @@ -892,10 +892,10 @@ def execute(self, context): def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch, test_dags_dir): """ - Test that the DAG parsing context is correctly set during the startup process. + Test that the Dag parsing context is correctly set during the startup process. - This test verifies that the DAG and task IDs are correctly set in the parsing context - when a DAG is started up. + This test verifies that the Dag and task IDs are correctly set in the parsing context + when a Dag is started up. """ dag_id = "dag_parsing_context_test" task_id = "conditional_task" @@ -912,8 +912,8 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch mock_supervisor_comms._get_response.return_value = what - # Set the environment variable for DAG bundles - # We use the DAG defined in `task_sdk/tests/dags/dag_parsing_context.py` for this test! + # Set the environment variable for Dag bundles + # We use the Dag defined in `task_sdk/tests/dags/dag_parsing_context.py` for this test! dag_bundle_val = json.dumps( [ { @@ -927,7 +927,7 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch monkeypatch.setenv("AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST", dag_bundle_val) ti, _, _ = startup() - # Presence of `conditional_task` below means DAG ID is properly set in the parsing context! + # Presence of `conditional_task` below means Dag ID is properly set in the parsing context! # Check the dag file for the actual logic! assert ti.task.dag.task_dict.keys() == {"visible_task", "conditional_task"} @@ -1220,7 +1220,7 @@ def test_get_context_without_ti_context_from_server(self, mocked_parse, make_ti_ task = BaseOperator(task_id="hello") dag_id = "basic_task" - # Assign task to DAG + # Assign task to Dag get_inline_dag(dag_id=dag_id, task=task) ti_id = uuid7()