Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fe1c526
Re-format add_logger_if_needed docstring
uranusjr May 15, 2023
844a167
Split long DagOwnerAttributes docstring
uranusjr May 15, 2023
4d4c9d3
Improve _get_or_create_dagrun
uranusjr May 16, 2023
a955238
Re-word and format DagRun class docstring
uranusjr May 16, 2023
8f726f6
Rewrite Trigger.clean_unused docstring
uranusjr May 16, 2023
00e04f8
Format Branch*Operator docstrings
uranusjr May 16, 2023
bdf7413
Re-word airflow.operators.python.task
uranusjr May 16, 2023
db3bc13
Format _reset_dag_run_and_task_instances docstring
uranusjr May 16, 2023
89ec05d
Re-word get_components docstring
uranusjr May 16, 2023
cea6524
Re-word serialize_template_field docstring
uranusjr May 16, 2023
c49d81f
Re-format some methods on Variable
uranusjr May 16, 2023
1c7a775
SQLAlchemy is one word
uranusjr May 16, 2023
65fb18e
This shouldn't be a docstring but a comment
uranusjr May 16, 2023
a034aff
Clean up docstrings in cli_action_loggers
uranusjr May 16, 2023
0e7150b
Clean up docstrings in airflow.utils.dates
uranusjr May 16, 2023
02c6181
Format TimezoneAware docstrings
uranusjr May 16, 2023
8e176c4
Reformat set_new_process_group docstring
uranusjr May 16, 2023
a87215a
Reformat nulls_first docstring
uranusjr May 16, 2023
32bbeaa
Remove empty :return: declaratives
uranusjr May 16, 2023
a1090c1
Improve docstrings in init_appbuilder
uranusjr May 17, 2023
61852e8
Improve docstrings in fab_security
uranusjr May 17, 2023
44a239c
Remove stray string
uranusjr May 18, 2023
4747429
Merge branch 'main' into docstring-improve
uranusjr May 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ def verify_dagruns(
:param state: state of the dag_run to set if commit is True
:param session: session to use
:param current_task: current task
:return:
"""
for dag_run in dag_runs:
dag_run.dag = current_task.subdag
Expand Down
44 changes: 23 additions & 21 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2649,13 +2649,14 @@ def test(
"""

def add_logger_if_needed(ti: TaskInstance):
"""
Add a formatted logger to the taskinstance so all logs are surfaced to the command line instead
of into a task file. Since this is a local test run, it is much better for the user to see logs
in the command line, rather than needing to search for a log file.
Args:
ti: The taskinstance that will receive a logger.
"""Add a formatted logger to the task instance.

This allows all logs to surface to the command line, instead of into
a task file. Since this is a local test run, it is much better for
the user to see logs in the command line, rather than needing to
search for a log file.

:param ti: The task instance that will receive a logger.
"""
format = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s")
handler = logging.StreamHandler(sys.stdout)
Expand Down Expand Up @@ -3256,9 +3257,10 @@ def __repr__(self):


class DagOwnerAttributes(Base):
"""
Table defining different owner attributes. For example, a link for an owner that will be passed as
a hyperlink to the DAGs view.
"""Table defining different owner attributes.

For example, a link for an owner that will be passed as a hyperlink to the
"DAGs" view.
"""

__tablename__ = "dag_owner_attributes"
Expand Down Expand Up @@ -3835,17 +3837,17 @@ def _get_or_create_dagrun(
run_id: str,
session: Session,
) -> DagRun:
"""
Create a DAGRun, but only after clearing the previous instance of said dagrun to prevent collisions.
This function is only meant for the `dag.test` function as a helper function.
"""Create a DAG run, replacing an existing instance if needed to prevent collisions.

This function is only meant to be used by :meth:`DAG.test` as a helper function.

:param dag: DAG to be used to find run.
:param conf: Configuration to pass to newly created run.
:param start_date: Start date of new run.
:param execution_date: Logical date for finding an existing run.
:param run_id: Run ID for the new DAG run.

:param dag: Dag to be used to find dagrun
:param conf: configuration to pass to newly created dagrun
:param start_date: start date of new dagrun, defaults to execution_date
:param execution_date: execution_date for finding the dagrun
:param run_id: run_id to pass to new dagrun
:param session: sqlalchemy session
:return:
:return: The newly created DAG run.
"""
log.info("dagrun id: %s", dag.dag_id)
dr: DagRun = (
Expand All @@ -3862,7 +3864,7 @@ def _get_or_create_dagrun(
run_id=run_id,
start_date=start_date or execution_date,
session=session,
conf=conf, # type: ignore
conf=conf,
)
log.info("created dagrun " + str(dr))
log.info("created dagrun %s", dr)
return dr
7 changes: 4 additions & 3 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ def _creator_note(val):


class DagRun(Base, LoggingMixin):
"""
DagRun describes an instance of a Dag. It can be created
by the scheduler (for regular runs) or by an external trigger.
"""Invocation instance of a DAG.

A DAG run can be created by the scheduler (i.e. scheduled runs), or by an
external trigger (i.e. manual runs).
"""

__tablename__ = "dag_run"
Expand Down
8 changes: 5 additions & 3 deletions airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ def bulk_fetch(cls, ids: Iterable[int], session: Session = NEW_SESSION) -> dict[
@internal_api_call
@provide_session
def clean_unused(cls, session: Session = NEW_SESSION) -> None:
"""
Deletes all triggers that have no tasks/DAGs dependent on them
(triggers have a one-to-many relationship to both).
"""Deletes all triggers that have no tasks dependent on them.

Triggers have a one-to-many relationship to task instances, so we need
to clean those up first. Afterwards we can drop the triggers not
referenced by anyone.
"""
# Update all task instances with trigger IDs that are not DEFERRED to remove them
for attempt in run_with_db_retries():
Expand Down
27 changes: 10 additions & 17 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ def get(
default_var: Any = __NO_DEFAULT_SENTINEL,
deserialize_json: bool = False,
) -> Any:
"""
Gets a value for an Airflow Variable Key.
"""Gets a value for an Airflow Variable Key

:param key: Variable Key
:param default_var: Default value of the Variable if the Variable doesn't exist
Expand Down Expand Up @@ -158,16 +157,15 @@ def set(
description: str | None = None,
serialize_json: bool = False,
session: Session = None,
):
"""
Sets a value for an Airflow Variable with a given Key.
This operation will overwrite an existing variable.
) -> None:
"""Sets a value for an Airflow Variable with a given Key.

This operation overwrites an existing variable.

:param key: Variable Key
:param value: Value to set for the Variable
:param description: Description of the Variable
:param serialize_json: Serialize the value to a JSON string
:param session: SQL Alchemy Sessions
"""
# check if the secret exists in the custom secrets' backend.
Variable.check_for_write_conflict(key)
Expand All @@ -188,14 +186,12 @@ def update(
value: Any,
serialize_json: bool = False,
session: Session = None,
):
"""
Updates a given Airflow Variable with the Provided value.
) -> None:
"""Updates a given Airflow Variable with the Provided value.

:param key: Variable Key
:param value: Value to set for the Variable
:param serialize_json: Serialize the value to a JSON string
:param session: SQL Alchemy Session
"""
Variable.check_for_write_conflict(key)

Expand All @@ -212,11 +208,9 @@ def update(
@provide_session
@internal_api_call
def delete(key: str, session: Session = None) -> int:
"""
Delete an Airflow Variable for a given key.
"""Delete an Airflow Variable for a given key.

:param key: Variable Key
:param session: SQL Alchemy Sessions
:param key: Variable Keys
"""
return session.query(Variable).filter(Variable.key == key).delete()

Expand All @@ -228,8 +222,7 @@ def rotate_fernet_key(self):

@staticmethod
def check_for_write_conflict(key: str) -> None:
"""
Logs a warning if a variable exists outside of the metastore.
"""Logs a warning if a variable exists outside of the metastore.

If we try to write a variable to the metastore while the same key
exists in an environment variable or custom secrets backend, then
Expand Down
4 changes: 2 additions & 2 deletions airflow/operators/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@


class BranchDateTimeOperator(BaseBranchOperator):
"""
Branches into one of two lists of tasks depending on the current datetime.
"""Branches into one of two lists of tasks depending on the current datetime.

For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:BranchDateTimeOperator`.

Expand Down
12 changes: 3 additions & 9 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,10 @@


def task(python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs):
"""
Deprecated function.
Calls @task.python and allows users to turn a python function into
an Airflow task. Please use the following instead.

from airflow.decorators import task
"""Deprecated. Use :func:`airflow.decorators.task` instead.

@task
def my_task()
Calls ``@task.python`` and allows users to turn a Python function into
an Airflow task.

:param python_callable: A reference to an object that is callable
:param op_kwargs: a dictionary of keyword arguments that will get unpacked
Expand All @@ -69,7 +64,6 @@ def my_task()
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
Defaults to False.
:return:
"""
# To maintain backwards compatibility, we import the task object into this file
# This prevents breakages in dags that use `from airflow.operators.python import task`
Expand Down
15 changes: 7 additions & 8 deletions airflow/operators/subdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,14 @@ def _get_dagrun(self, execution_date):
)
return dag_runs[0] if dag_runs else None

def _reset_dag_run_and_task_instances(self, dag_run, execution_date):
"""
Set task instance states to allow for execution.
Set the DagRun state to RUNNING and set the failed TaskInstances to None state
for scheduler to pick up.
def _reset_dag_run_and_task_instances(self, dag_run: DagRun, execution_date: datetime) -> None:
"""Set task instance states to allow for execution.

The state of the DAG run will be set to RUNNING, and failed task
instances to ``None`` for scheduler to pick up.

:param dag_run: DAG run
:param execution_date: Execution date
:return: None
:param dag_run: DAG run to reset.
:param execution_date: Execution date to select task instances.
"""
with create_session() as session:
dag_run.state = State.RUNNING
Expand Down
50 changes: 26 additions & 24 deletions airflow/operators/weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,56 +28,58 @@


class BranchDayOfWeekOperator(BaseBranchOperator):
"""
Branches into one of two lists of tasks depending on the current day.
For more information on how to use this operator, take a look at the guide.
"""Branches into one of two lists of tasks depending on the current day.

For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:BranchDayOfWeekOperator`

**Example** (with single day): ::
**Example** (with single day):

.. code-block:: python

from airflow.operators.empty import EmptyOperator

monday = EmptyOperator(task_id='monday')
other_day = EmptyOperator(task_id='other_day')
monday = EmptyOperator(task_id="monday")
other_day = EmptyOperator(task_id="other_day")

monday_check = DayOfWeekSensor(
task_id='monday_check',
week_day='Monday',
task_id="monday_check",
week_day="Monday",
use_task_logical_date=True,
follow_task_ids_if_true='monday',
follow_task_ids_if_false='other_day',
dag=dag)
follow_task_ids_if_true="monday",
follow_task_ids_if_false="other_day",
)
monday_check >> [monday, other_day]

**Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): ::
**Example** (with :class:`~airflow.utils.weekday.WeekDay` enum):

.. code-block:: python

# import WeekDay Enum
from airflow.utils.weekday import WeekDay
from airflow.operators.empty import EmptyOperator

workday = EmptyOperator(task_id='workday')
weekend = EmptyOperator(task_id='weekend')
workday = EmptyOperator(task_id="workday")
weekend = EmptyOperator(task_id="weekend")
weekend_check = BranchDayOfWeekOperator(
task_id='weekend_check',
task_id="weekend_check",
week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
use_task_logical_date=True,
follow_task_ids_if_true='weekend',
follow_task_ids_if_false='workday',
dag=dag)
follow_task_ids_if_true="weekend",
follow_task_ids_if_false="workday",
)
# add downstream dependencies as you would do with any branch operator
weekend_check >> [workday, weekend]

:param follow_task_ids_if_true: task id or task ids to follow if criteria met
:param follow_task_ids_if_false: task id or task ids to follow if criteria does not met
:param week_day: Day of the week to check (full name). Optionally, a set
of days can also be provided using a set.
Example values:
of days can also be provided using a set. Example values:

* ``"MONDAY"``,
* ``{"Saturday", "Sunday"}``
* ``{WeekDay.TUESDAY}``
* ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
* ``"MONDAY"``,
* ``{"Saturday", "Sunday"}``
* ``{WeekDay.TUESDAY}``
* ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``

To use `WeekDay` enum, import it from `airflow.utils.weekday`

Expand Down
Loading