Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion task-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://s.apache.org/airflow-slack)


The Apache Airflow Task SDK includes interfaces for DAG authors and Task execution logic for Python.
The Apache Airflow Task SDK includes interfaces for Dag authors and Task execution logic for Python.

## Installation

Expand Down
12 changes: 6 additions & 6 deletions task-sdk/src/airflow/sdk/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ def trigger(
logical_date: datetime | None = None,
reset_dag_run: bool = False,
) -> OKResponse | ErrorResponse:
"""Trigger a DAG run via the API server."""
"""Trigger a Dag run via the API server."""
body = TriggerDAGRunPayload(logical_date=logical_date, conf=conf or {}, reset_dag_run=reset_dag_run)

try:
Expand All @@ -642,23 +642,23 @@ def trigger(
except ServerResponseError as e:
if e.response.status_code == HTTPStatus.CONFLICT:
if reset_dag_run:
log.info("DAG Run already exists; Resetting DAG Run.", dag_id=dag_id, run_id=run_id)
log.info("Dag Run already exists; Resetting Dag Run.", dag_id=dag_id, run_id=run_id)
return self.clear(run_id=run_id, dag_id=dag_id)

log.info("DAG Run already exists!", detail=e.detail, dag_id=dag_id, run_id=run_id)
log.info("Dag Run already exists!", detail=e.detail, dag_id=dag_id, run_id=run_id)
return ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS)
raise

return OKResponse(ok=True)

def clear(self, dag_id: str, run_id: str) -> OKResponse:
"""Clear a DAG run via the API server."""
"""Clear a Dag run via the API server."""
self.client.post(f"dag-runs/{dag_id}/{run_id}/clear")
# TODO: Error handling
return OKResponse(ok=True)

def get_state(self, dag_id: str, run_id: str) -> DagRunStateResponse:
"""Get the state of a DAG run via the API server."""
"""Get the state of a Dag run via the API server."""
resp = self.client.get(f"dag-runs/{dag_id}/{run_id}/state")
return DagRunStateResponse.model_validate_json(resp.read())

Expand All @@ -669,7 +669,7 @@ def get_count(
run_ids: list[str] | None = None,
states: list[str] | None = None,
) -> DRCount:
"""Get count of DAG runs matching the given criteria."""
"""Get count of Dag runs matching the given criteria."""
params = {
"dag_id": dag_id,
"logical_dates": [d.isoformat() for d in logical_dates] if logical_dates is not None else None,
Expand Down
4 changes: 2 additions & 2 deletions task-sdk/src/airflow/sdk/bases/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def get_unique_task_id(
task_group: TaskGroup | None = None,
) -> str:
"""
Generate unique task id given a DAG (or if run in a DAG context).
Generate unique task id given a Dag (or if run in a Dag context).

IDs are generated by appending a unique number to the end of
the original task id.
Expand Down Expand Up @@ -645,7 +645,7 @@ def task_decorator_factory(
"""
Generate a wrapper that wraps a function into an Airflow operator.

Can be reused in a single DAG.
Can be reused in a single Dag.

:param python_callable: Function to decorate.
:param multiple_outputs: If set to True, the decorated function's return
Expand Down
4 changes: 2 additions & 2 deletions task-sdk/src/airflow/sdk/bases/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class BaseNotifier(LoggingMixin, Templater):
It can be used asynchronously (preferred) if `async_notify`is implemented and/or
synchronously if `notify` is implemented.

Currently, the DAG/Task state change callbacks run on the DAG Processor and only support sync usage.
Currently, the Dag/Task state change callbacks run on the Dag Processor and only support sync usage.

Usage::
# Asynchronous usage
Expand Down Expand Up @@ -109,7 +109,7 @@ def notify(self, context: Context) -> None:
"""
Send a notification (sync).

Implementing this is a requirement for running this notifier in the DAG processor, which is where the
Implementing this is a requirement for running this notifier in the Dag processor, which is where the
`on_success_callback` and `on_failure_callback` run.

:param context: The airflow context
Expand Down
36 changes: 18 additions & 18 deletions task-sdk/src/airflow/sdk/bases/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def partial(
if task_group:
task_id = task_group.child_id(task_id)

# Merge DAG and task group level defaults into user-supplied values.
# Merge Dag and task group level defaults into user-supplied values.
dag_default_args, partial_params = get_merged_defaults(
dag=dag,
task_group=task_group,
Expand All @@ -331,7 +331,7 @@ def partial(
**kwargs,
}

# Inject DAG-level default args into args provided to this function.
# Inject Dag-level default args into args provided to this function.
# Most of the default args will be retrieved during unmapping; here we
# only ensure base properties are correctly set for the scheduler.
partial_kwargs.update(
Expand Down Expand Up @@ -613,8 +613,8 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
r"""
Abstract base class for all operators.

Since operators create objects that become nodes in the DAG, BaseOperator
contains many recursive methods for DAG crawling behavior. To derive from
Since operators create objects that become nodes in the Dag, BaseOperator
contains many recursive methods for Dag crawling behavior. To derive from
this class, you are expected to override the constructor and the 'execute'
method.

Expand All @@ -628,7 +628,7 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):

This class is abstract and shouldn't be instantiated. Instantiating a
class derived from this one results in the creation of a task object,
which ultimately becomes a node in DAG objects. Task dependencies should
which ultimately becomes a node in Dag objects. Task dependencies should
be set by using the set_upstream and/or set_downstream methods.

:param task_id: a unique, meaningful id for the task
Expand All @@ -654,7 +654,7 @@ class derived from this one results in the creation of a task object,
:param start_date: The ``start_date`` for the task, determines
the ``logical_date`` for the first task instance. The best practice
is to have the start_date rounded
to your DAG's ``schedule_interval``. Daily jobs have their start_date
to your Dag's ``schedule_interval``. Daily jobs have their start_date
some day at 00:00:00, hourly jobs have their start_date at 00:00
of a specific hour. Note that Airflow simply looks at the latest
``logical_date`` and adds the ``schedule_interval`` to determine
Expand Down Expand Up @@ -709,7 +709,7 @@ class derived from this one results in the creation of a task object,
you know exactly what priority weight each task should have.
Additionally, when set to ``absolute``, there is bonus effect of
significantly speeding up the task creation process as for very large
DAGs. Options can be set as string or using the constants defined in
Dags. Options can be set as string or using the constants defined in
the static class ``airflow.utils.WeightRule``.
Irrespective of the weight rule, resulting priority values are capped with 32-bit.
|experimental|
Expand Down Expand Up @@ -744,7 +744,7 @@ class derived from this one results in the creation of a task object,
:param on_skipped_callback: much like the ``on_failure_callback`` except
that it is executed when skipped occur; this callback will be called only if AirflowSkipException get raised.
Explicitly it is NOT called if a task is not started to be executed because of a preceding branching
decision in the DAG or a trigger rule which causes execution to skip so that the task execution
decision in the Dag or a trigger rule which causes execution to skip so that the task execution
is never scheduled.
:param pre_execute: a function to be called immediately before task
execution, receiving a context dictionary; raising an exception will
Expand All @@ -769,7 +769,7 @@ class derived from this one results in the creation of a task object,
:param max_active_tis_per_dag: When set, a task will be able to limit the concurrent
runs across logical_dates.
:param max_active_tis_per_dagrun: When set, a task will be able to limit the concurrent
task instances per DAG run.
task instances per Dag run.
:param executor: Which executor to target when running this task. NOT YET SUPPORTED
:param executor_config: Additional task-level configuration parameters that are
interpreted by a specific executor. Parameters are namespaced by the name of
Expand Down Expand Up @@ -1031,7 +1031,7 @@ def __init__(
allow_nested_operators: bool = True,
**kwargs: Any,
):
# Note: Metaclass handles passing in the DAG/TaskGroup from active context manager, if any
# Note: Metaclass handles passing in the Dag/TaskGroup from active context manager, if any

# Only apply task_group prefix if this operator was not created from a mapped operator
# Mapped operators already have the prefix applied during their creation
Expand Down Expand Up @@ -1289,14 +1289,14 @@ def get_dag(self) -> DAG | None:

@property
def dag(self) -> DAG:
"""Returns the Operator's DAG if set, otherwise raises an error."""
"""Returns the Operator's Dag if set, otherwise raises an error."""
if dag := self._dag:
return dag
raise RuntimeError(f"Operator {self} has not been assigned to a DAG yet")
raise RuntimeError(f"Operator {self} has not been assigned to a Dag yet")

@dag.setter
def dag(self, dag: DAG | None) -> None:
"""Operators can be assigned to one DAG, one time. Repeat assignments to that same DAG are ok."""
"""Operators can be assigned to one Dag, one time. Repeat assignments to that same Dag are ok."""
self._dag = dag

def _convert__dag(self, dag: DAG | None) -> DAG | None:
Expand All @@ -1307,12 +1307,12 @@ def _convert__dag(self, dag: DAG | None) -> DAG | None:
return dag

if not isinstance(dag, DAG):
raise TypeError(f"Expected DAG; received {dag.__class__.__name__}")
raise TypeError(f"Expected Dag; received {dag.__class__.__name__}")
if self._dag is not None and self._dag is not dag:
raise ValueError(f"The DAG assigned to {self} can not be changed.")
raise ValueError(f"The Dag assigned to {self} can not be changed.")

if self.__from_mapped:
pass # Don't add to DAG -- the mapped task takes the place.
pass # Don't add to Dag -- the mapped task takes the place.
elif dag.task_dict.get(self.task_id) is not self:
dag.add_task(self)
return dag
Expand Down Expand Up @@ -1370,7 +1370,7 @@ def task_display_name(self) -> str:
return self._task_display_name or self.task_id

def has_dag(self):
"""Return True if the Operator has been assigned to a DAG."""
"""Return True if the Operator has been assigned to a Dag."""
return self._dag is not None

def _set_xcomargs_dependencies(self) -> None:
Expand Down Expand Up @@ -1456,7 +1456,7 @@ def output(self) -> XComArg:

@classmethod
def get_serialized_fields(cls):
"""Stringified DAGs and operators contain exactly these fields."""
"""Stringified Dags and operators contain exactly these fields."""
if not cls.__serialized_fields:
from airflow.sdk.definitions._internal.contextmanager import DagContext

Expand Down
24 changes: 12 additions & 12 deletions task-sdk/src/airflow/sdk/bases/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ def set(

:param key: Key to store the XCom.
:param value: XCom value to store.
:param dag_id: DAG ID.
:param dag_id: Dag ID.
:param task_id: Task ID.
:param run_id: DAG run ID for the task.
:param run_id: Dag run ID for the task.
:param map_index: Optional map index to assign XCom for a mapped task.
The default is ``-1`` (set for a non-mapped task).
"""
Expand Down Expand Up @@ -111,9 +111,9 @@ def _set_xcom_in_db(

:param key: Key to store the XCom.
:param value: XCom value to store.
:param dag_id: DAG ID.
:param dag_id: Dag ID.
:param task_id: Task ID.
:param run_id: DAG run ID for the task.
:param run_id: Dag run ID for the task.
:param map_index: Optional map index to assign XCom for a mapped task.
The default is ``-1`` (set for a non-mapped task).
"""
Expand Down Expand Up @@ -180,8 +180,8 @@ def _get_xcom_db_ref(
.. seealso:: ``get_value()`` is a convenience function if you already
have a structured TaskInstance or TaskInstanceKey object available.

:param run_id: DAG run ID for the task.
:param dag_id: Only pull XCom from this DAG. Pass *None* (default) to
:param run_id: Dag run ID for the task.
:param dag_id: Only pull XCom from this Dag. Pass *None* (default) to
remove the filter.
:param task_id: Only XCom from task with matching ID will be pulled.
Pass *None* (default) to remove the filter.
Expand Down Expand Up @@ -230,8 +230,8 @@ def get_one(
.. seealso:: ``get_value()`` is a convenience function if you already
have a structured TaskInstance or TaskInstanceKey object available.

:param run_id: DAG run ID for the task.
:param dag_id: Only pull XCom from this DAG. Pass *None* (default) to
:param run_id: Dag run ID for the task.
:param dag_id: Only pull XCom from this Dag. Pass *None* (default) to
remove the filter.
:param task_id: Only XCom from task with matching ID will be pulled.
Pass *None* (default) to remove the filter.
Expand All @@ -240,7 +240,7 @@ def get_one(
:param key: A key for the XCom. If provided, only XCom with matching
keys will be returned. Pass *None* (default) to remove the filter.
:param include_prior_dates: If *False* (default), only XCom from the
specified DAG run is returned. If *True*, the latest matching XCom is
specified Dag run is returned. If *True*, the latest matching XCom is
returned regardless of the run it belongs to.
"""
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
Expand Down Expand Up @@ -291,11 +291,11 @@ def get_all(
indexes of a mapped task at once.

:param key: A key for the XCom. Only XComs with this key will be returned.
:param run_id: DAG run ID for the task.
:param dag_id: DAG ID to pull XComs from.
:param run_id: Dag run ID for the task.
:param dag_id: Dag ID to pull XComs from.
:param task_id: Task ID to pull XComs from.
:param include_prior_dates: If *False* (default), only XComs from the
specified DAG run are returned. If *True*, the latest matching XComs are
specified Dag run are returned. If *True*, the latest matching XComs are
returned regardless of the run they belong to.
:return: List of all XCom values if found.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,10 @@ def _iter_all_mapped_downstreams(self) -> Iterator[MappedOperator | MappedTaskGr
"""
Return mapped nodes that are direct dependencies of the current task.

For now, this walks the entire DAG to find mapped nodes that has this
For now, this walks the entire Dag to find mapped nodes that has this
current task as an upstream. We cannot use ``downstream_list`` since it
only contains operators, not task groups. In the future, we should
provide a way to record an DAG node's all downstream nodes instead.
provide a way to record an Dag node's all downstream nodes instead.

Note that this does not guarantee the returned tasks actually use the
current task for task mapping, but only checks those task are mapped
Expand All @@ -348,7 +348,7 @@ def _walk_group(group: TaskGroup) -> Iterable[tuple[str, DAGNode]]:

dag = self.get_dag()
if not dag:
raise RuntimeError("Cannot check for mapped dependants when not attached to a DAG")
raise RuntimeError("Cannot check for mapped dependants when not attached to a Dag")
for key, child in _walk_group(dag.task_group):
if key == self.node_id:
continue
Expand All @@ -361,10 +361,10 @@ def iter_mapped_dependants(self) -> Iterator[MappedOperator | MappedTaskGroup]:
"""
Return mapped nodes that depend on the current task the expansion.

For now, this walks the entire DAG to find mapped nodes that has this
For now, this walks the entire Dag to find mapped nodes that has this
current task as an upstream. We cannot use ``downstream_list`` since it
only contains operators, not task groups. In the future, we should
provide a way to record an DAG node's all downstream nodes instead.
provide a way to record an Dag node's all downstream nodes instead.
"""
return (
downstream
Expand All @@ -386,7 +386,7 @@ def iter_mapped_task_groups(self) -> Iterator[MappedTaskGroup]:

def get_closest_mapped_task_group(self) -> MappedTaskGroup | None:
"""
Get the mapped task group "closest" to this task in the DAG.
Get the mapped task group "closest" to this task in the Dag.

:meta private:
"""
Expand All @@ -408,7 +408,7 @@ def get_needs_expansion(self) -> bool:
@methodtools.lru_cache(maxsize=None)
def get_parse_time_mapped_ti_count(self) -> int:
"""
Return the number of mapped task instances that can be created on DAG run creation.
Return the number of mapped task instances that can be created on Dag run creation.

This only considers literal mapped arguments, and would return *None*
when any non-literal values are used for mapping.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ContextStackMeta(type):
_context: deque

# TODO: Task-SDK:
# share_parent_context can go away once the DAG and TaskContext manager in airflow.models are removed and
# share_parent_context can go away once the Dag and TaskContext manager in airflow.models are removed and
# everything uses sdk fully for definition/parsing
def __new__(cls, name, bases, namespace, share_parent_context: bool = False, **kwargs: Any):
if not share_parent_context:
Expand Down Expand Up @@ -89,9 +89,9 @@ def get_current(cls) -> T | None:

class DagContext(ContextStack[DAG]):
"""
DAG context is used to keep the current DAG when DAG is used as ContextManager.
Dag context is used to keep the current Dag when Dag is used as ContextManager.

You can use DAG as context:
You can use Dag as context:

.. code-block:: python

Expand All @@ -103,8 +103,8 @@ class DagContext(ContextStack[DAG]):
) as dag:
...

If you do this the context stores the DAG and whenever new task is created, it will use
such stored DAG as the parent DAG.
If you do this the context stores the Dag and whenever new task is created, it will use
such stored Dag as the parent Dag.

"""

Expand Down
2 changes: 1 addition & 1 deletion task-sdk/src/airflow/sdk/definitions/_internal/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def iter_references(self) -> Iterable[tuple[Operator, str]]:
"""
Find underlying XCom references this contains.

This is used by the DAG parser to recursively find task dependencies.
This is used by the Dag parser to recursively find task dependencies.

:meta private:
"""
Expand Down
Loading