diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py index 83030efb4874b..df6b87d5c2bbc 100644 --- a/airflow/api/common/mark_tasks.py +++ b/airflow/api/common/mark_tasks.py @@ -256,7 +256,6 @@ def verify_dagruns( :param state: state of the dag_run to set if commit is True :param session: session to use :param current_task: current task - :return: """ for dag_run in dag_runs: dag_run.dag = current_task.subdag diff --git a/airflow/models/dag.py b/airflow/models/dag.py index b536510f7c441..4009893fee77b 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2649,13 +2649,14 @@ def test( """ def add_logger_if_needed(ti: TaskInstance): - """ - Add a formatted logger to the taskinstance so all logs are surfaced to the command line instead - of into a task file. Since this is a local test run, it is much better for the user to see logs - in the command line, rather than needing to search for a log file. - Args: - ti: The taskinstance that will receive a logger. + """Add a formatted logger to the task instance. + + This allows all logs to surface to the command line, instead of into + a task file. Since this is a local test run, it is much better for + the user to see logs in the command line, rather than needing to + search for a log file. + :param ti: The task instance that will receive a logger. """ format = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s") handler = logging.StreamHandler(sys.stdout) @@ -3256,9 +3257,10 @@ def __repr__(self): class DagOwnerAttributes(Base): - """ - Table defining different owner attributes. For example, a link for an owner that will be passed as - a hyperlink to the DAGs view. + """Table defining different owner attributes. + + For example, a link for an owner that will be passed as a hyperlink to the + "DAGs" view. """ __tablename__ = "dag_owner_attributes" @@ -3835,17 +3837,17 @@ def _get_or_create_dagrun( run_id: str, session: Session, ) -> DagRun: - """ - Create a DAGRun, but only after clearing the previous instance of said dagrun to prevent collisions. - This function is only meant for the `dag.test` function as a helper function. + """Create a DAG run, replacing an existing instance if needed to prevent collisions. + + This function is only meant to be used by :meth:`DAG.test` as a helper function. + + :param dag: DAG to be used to find run. + :param conf: Configuration to pass to newly created run. + :param start_date: Start date of new run. + :param execution_date: Logical date for finding an existing run. + :param run_id: Run ID for the new DAG run. - :param dag: Dag to be used to find dagrun - :param conf: configuration to pass to newly created dagrun - :param start_date: start date of new dagrun, defaults to execution_date - :param execution_date: execution_date for finding the dagrun - :param run_id: run_id to pass to new dagrun - :param session: sqlalchemy session - :return: + :return: The newly created DAG run. """ log.info("dagrun id: %s", dag.dag_id) dr: DagRun = ( @@ -3862,7 +3864,7 @@ def _get_or_create_dagrun( run_id=run_id, start_date=start_date or execution_date, session=session, - conf=conf, # type: ignore + conf=conf, ) - log.info("created dagrun " + str(dr)) + log.info("created dagrun %s", dr) return dr diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 5647b3410d21e..42845b34bdd0e 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -97,9 +97,10 @@ def _creator_note(val): class DagRun(Base, LoggingMixin): - """ - DagRun describes an instance of a Dag. It can be created - by the scheduler (for regular runs) or by an external trigger. + """Invocation instance of a DAG. + + A DAG run can be created by the scheduler (i.e. scheduled runs), or by an + external trigger (i.e. manual runs). """ __tablename__ = "dag_run" diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py index a2791f0c27d6d..8a02d053578e3 100644 --- a/airflow/models/trigger.py +++ b/airflow/models/trigger.py @@ -112,9 +112,11 @@ def bulk_fetch(cls, ids: Iterable[int], session: Session = NEW_SESSION) -> dict[ @internal_api_call @provide_session def clean_unused(cls, session: Session = NEW_SESSION) -> None: - """ - Deletes all triggers that have no tasks/DAGs dependent on them - (triggers have a one-to-many relationship to both). + """Deletes all triggers that have no tasks dependent on them. + + Triggers have a one-to-many relationship to task instances, so we need + to clean those up first. Afterwards we can drop the triggers not + referenced by anyone. """ # Update all task instances with trigger IDs that are not DEFERRED to remove them for attempt in run_with_db_retries(): diff --git a/airflow/models/variable.py b/airflow/models/variable.py index 265959255769d..843f60f9039ba 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -127,8 +127,7 @@ def get( default_var: Any = __NO_DEFAULT_SENTINEL, deserialize_json: bool = False, ) -> Any: - """ - Gets a value for an Airflow Variable Key. + """Gets a value for an Airflow Variable Key :param key: Variable Key :param default_var: Default value of the Variable if the Variable doesn't exist @@ -158,16 +157,15 @@ def set( description: str | None = None, serialize_json: bool = False, session: Session = None, - ): - """ - Sets a value for an Airflow Variable with a given Key. - This operation will overwrite an existing variable. + ) -> None: + """Sets a value for an Airflow Variable with a given Key. + + This operation overwrites an existing variable. :param key: Variable Key :param value: Value to set for the Variable :param description: Description of the Variable :param serialize_json: Serialize the value to a JSON string - :param session: SQL Alchemy Sessions """ # check if the secret exists in the custom secrets' backend. Variable.check_for_write_conflict(key) @@ -188,14 +186,12 @@ def update( value: Any, serialize_json: bool = False, session: Session = None, - ): - """ - Updates a given Airflow Variable with the Provided value. + ) -> None: + """Updates a given Airflow Variable with the Provided value. :param key: Variable Key :param value: Value to set for the Variable :param serialize_json: Serialize the value to a JSON string - :param session: SQL Alchemy Session """ Variable.check_for_write_conflict(key) @@ -212,11 +208,9 @@ def update( @provide_session @internal_api_call def delete(key: str, session: Session = None) -> int: - """ - Delete an Airflow Variable for a given key. + """Delete an Airflow Variable for a given key. - :param key: Variable Key - :param session: SQL Alchemy Sessions + :param key: Variable Keys """ return session.query(Variable).filter(Variable.key == key).delete() @@ -228,8 +222,7 @@ def rotate_fernet_key(self): @staticmethod def check_for_write_conflict(key: str) -> None: - """ - Logs a warning if a variable exists outside of the metastore. + """Logs a warning if a variable exists outside of the metastore. If we try to write a variable to the metastore while the same key exists in an environment variable or custom secrets backend, then diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py index f56fc2d3d398a..7c3c648130c15 100644 --- a/airflow/operators/datetime.py +++ b/airflow/operators/datetime.py @@ -27,8 +27,8 @@ class BranchDateTimeOperator(BaseBranchOperator): - """ - Branches into one of two lists of tasks depending on the current datetime. + """Branches into one of two lists of tasks depending on the current datetime. + For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:BranchDateTimeOperator`. diff --git a/airflow/operators/python.py b/airflow/operators/python.py index 7a632114c9da9..9300d6ca903a2 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -51,15 +51,10 @@ def task(python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs): - """ - Deprecated function. - Calls @task.python and allows users to turn a python function into - an Airflow task. Please use the following instead. - - from airflow.decorators import task + """Deprecated. Use :func:`airflow.decorators.task` instead. - @task - def my_task() + Calls ``@task.python`` and allows users to turn a Python function into + an Airflow task. :param python_callable: A reference to an object that is callable :param op_kwargs: a dictionary of keyword arguments that will get unpacked @@ -69,7 +64,6 @@ def my_task() :param multiple_outputs: if set, function return value will be unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. Defaults to False. - :return: """ # To maintain backwards compatibility, we import the task object into this file # This prevents breakages in dags that use `from airflow.operators.python import task` diff --git a/airflow/operators/subdag.py b/airflow/operators/subdag.py index 52f4b274032f3..9ac595afabd34 100644 --- a/airflow/operators/subdag.py +++ b/airflow/operators/subdag.py @@ -127,15 +127,14 @@ def _get_dagrun(self, execution_date): ) return dag_runs[0] if dag_runs else None - def _reset_dag_run_and_task_instances(self, dag_run, execution_date): - """ - Set task instance states to allow for execution. - Set the DagRun state to RUNNING and set the failed TaskInstances to None state - for scheduler to pick up. + def _reset_dag_run_and_task_instances(self, dag_run: DagRun, execution_date: datetime) -> None: + """Set task instance states to allow for execution. + + The state of the DAG run will be set to RUNNING, and failed task + instances to ``None`` for scheduler to pick up. - :param dag_run: DAG run - :param execution_date: Execution date - :return: None + :param dag_run: DAG run to reset. + :param execution_date: Execution date to select task instances. """ with create_session() as session: dag_run.state = State.RUNNING diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py index d35204d31cbdf..ea57ac65bdb93 100644 --- a/airflow/operators/weekday.py +++ b/airflow/operators/weekday.py @@ -28,56 +28,58 @@ class BranchDayOfWeekOperator(BaseBranchOperator): - """ - Branches into one of two lists of tasks depending on the current day. - For more information on how to use this operator, take a look at the guide. + """Branches into one of two lists of tasks depending on the current day. + For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:BranchDayOfWeekOperator` - **Example** (with single day): :: + **Example** (with single day): + + .. code-block:: python from airflow.operators.empty import EmptyOperator - monday = EmptyOperator(task_id='monday') - other_day = EmptyOperator(task_id='other_day') + monday = EmptyOperator(task_id="monday") + other_day = EmptyOperator(task_id="other_day") monday_check = DayOfWeekSensor( - task_id='monday_check', - week_day='Monday', + task_id="monday_check", + week_day="Monday", use_task_logical_date=True, - follow_task_ids_if_true='monday', - follow_task_ids_if_false='other_day', - dag=dag) + follow_task_ids_if_true="monday", + follow_task_ids_if_false="other_day", + ) monday_check >> [monday, other_day] - **Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): :: + **Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): + + .. code-block:: python # import WeekDay Enum from airflow.utils.weekday import WeekDay from airflow.operators.empty import EmptyOperator - workday = EmptyOperator(task_id='workday') - weekend = EmptyOperator(task_id='weekend') + workday = EmptyOperator(task_id="workday") + weekend = EmptyOperator(task_id="weekend") weekend_check = BranchDayOfWeekOperator( - task_id='weekend_check', + task_id="weekend_check", week_day={WeekDay.SATURDAY, WeekDay.SUNDAY}, use_task_logical_date=True, - follow_task_ids_if_true='weekend', - follow_task_ids_if_false='workday', - dag=dag) + follow_task_ids_if_true="weekend", + follow_task_ids_if_false="workday", + ) # add downstream dependencies as you would do with any branch operator weekend_check >> [workday, weekend] :param follow_task_ids_if_true: task id or task ids to follow if criteria met :param follow_task_ids_if_false: task id or task ids to follow if criteria does not met :param week_day: Day of the week to check (full name). Optionally, a set - of days can also be provided using a set. - Example values: + of days can also be provided using a set. Example values: - * ``"MONDAY"``, - * ``{"Saturday", "Sunday"}`` - * ``{WeekDay.TUESDAY}`` - * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}`` + * ``"MONDAY"``, + * ``{"Saturday", "Sunday"}`` + * ``{WeekDay.TUESDAY}`` + * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}`` To use `WeekDay` enum, import it from `airflow.utils.weekday` diff --git a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py index 9d37da983e519..91257ced2d939 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py +++ b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py @@ -43,41 +43,33 @@ def _convert_from_dict(obj, new_class): def convert_volume(volume) -> k8s.V1Volume: - """ - Converts an airflow Volume object into a k8s.V1Volume + """Converts an airflow Volume object into a k8s.V1Volume :param volume: - :return: k8s.V1Volume """ return _convert_kube_model_object(volume, k8s.V1Volume) def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount: - """ - Converts an airflow VolumeMount object into a k8s.V1VolumeMount + """Converts an airflow VolumeMount object into a k8s.V1VolumeMount :param volume_mount: - :return: k8s.V1VolumeMount """ return _convert_kube_model_object(volume_mount, k8s.V1VolumeMount) def convert_port(port) -> k8s.V1ContainerPort: - """ - Converts an airflow Port object into a k8s.V1ContainerPort + """Converts an airflow Port object into a k8s.V1ContainerPort :param port: - :return: k8s.V1ContainerPort """ return _convert_kube_model_object(port, k8s.V1ContainerPort) def convert_env_vars(env_vars) -> list[k8s.V1EnvVar]: - """ - Converts a dictionary into a list of env_vars + """Converts a dictionary into a list of env_vars :param env_vars: - :return: """ if isinstance(env_vars, dict): res = [] @@ -91,21 +83,17 @@ def convert_env_vars(env_vars) -> list[k8s.V1EnvVar]: def convert_pod_runtime_info_env(pod_runtime_info_envs) -> k8s.V1EnvVar: - """ - Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar + """Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar :param pod_runtime_info_envs: - :return: """ return _convert_kube_model_object(pod_runtime_info_envs, k8s.V1EnvVar) def convert_image_pull_secrets(image_pull_secrets) -> list[k8s.V1LocalObjectReference]: - """ - Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar + """Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar :param image_pull_secrets: - :return: """ if isinstance(image_pull_secrets, str): secrets = image_pull_secrets.split(",") @@ -115,11 +103,9 @@ def convert_image_pull_secrets(image_pull_secrets) -> list[k8s.V1LocalObjectRefe def convert_configmap(configmaps) -> k8s.V1EnvFromSource: - """ - Converts a str into an k8s.V1EnvFromSource + """Converts a str into an k8s.V1EnvFromSource :param configmaps: - :return: """ return k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmaps)) diff --git a/airflow/providers/google/go_module_utils.py b/airflow/providers/google/go_module_utils.py index a05590dd0a7f0..8e1f608191c3d 100644 --- a/airflow/providers/google/go_module_utils.py +++ b/airflow/providers/google/go_module_utils.py @@ -24,12 +24,12 @@ def init_module(go_module_name: str, go_module_path: str) -> None: - """Initialize a Go module. If a ``go.mod`` file already exists, this function - will do nothing. + """Initialize a Go module. + + If a ``go.mod`` file already exists, this function will do nothing. :param go_module_name: The name of the Go module to initialize. :param go_module_path: The path to the directory containing the Go module. - :return: """ if os.path.isfile(os.path.join(go_module_path, "go.mod")): return @@ -41,7 +41,6 @@ def install_dependencies(go_module_path: str) -> None: """Install dependencies for a Go module. :param go_module_path: The path to the directory containing the Go module. - :return: """ go_mod_tidy = ["go", "mod", "tidy"] execute_in_subprocess(go_mod_tidy, cwd=go_module_path) diff --git a/airflow/security/utils.py b/airflow/security/utils.py index 46c31c2d74ac3..1ffe0e0d29bac 100644 --- a/airflow/security/utils.py +++ b/airflow/security/utils.py @@ -41,11 +41,11 @@ def get_components(principal) -> list[str] | None: - """ - Returns components retrieved from the kerberos principal. - -> (short name, instance (FQDN), realm). + """Split the kerberos principal string into parts. - ``principal`` . + :return: *None* if the principal is empty. Otherwise split the value into + parts. Assuming the principal string is valid, the return value should + contain three components: short name, instance (FQDN), and realm. """ if not principal: return None diff --git a/airflow/serialization/helpers.py b/airflow/serialization/helpers.py index 07481c58dc355..d8e6afdc0a43b 100644 --- a/airflow/serialization/helpers.py +++ b/airflow/serialization/helpers.py @@ -23,12 +23,10 @@ def serialize_template_field(template_field: Any) -> str | dict | list | int | float: - """ - Return a serializable representation of the templated_field. - If a templated_field contains a Class or Instance for recursive templating, store them - as strings. If the templated_field is not recursive return the field. + """Return a serializable representation of the templated field. - :param template_field: Task's Templated Field + If ``templated_field`` contains a class or instance that requires recursive + templating, store them as strings. Otherwise simply return the field as-is. """ def is_jsonable(x): diff --git a/airflow/serialization/pydantic/dag_run.py b/airflow/serialization/pydantic/dag_run.py index b7ebceb82670d..a5ae6b4a22b52 100644 --- a/airflow/serialization/pydantic/dag_run.py +++ b/airflow/serialization/pydantic/dag_run.py @@ -46,6 +46,6 @@ class DagRunPydantic(BaseModelPydantic): consumed_dataset_events: List[DatasetEventPydantic] class Config: - """Make sure it deals automatically with ORM classes of SQL Alchemy.""" + """Make sure it deals automatically with SQLAlchemy ORM classes.""" orm_mode = True diff --git a/airflow/serialization/pydantic/dataset.py b/airflow/serialization/pydantic/dataset.py index f3752f4ade1f6..4ffcb6dc02ce8 100644 --- a/airflow/serialization/pydantic/dataset.py +++ b/airflow/serialization/pydantic/dataset.py @@ -32,7 +32,7 @@ class DagScheduleDatasetReferencePydantic(BaseModelPydantic): updated_at: datetime class Config: - """Make sure it deals automatically with ORM classes of SQL Alchemy.""" + """Make sure it deals automatically with SQLAlchemy ORM classes.""" orm_mode = True @@ -50,7 +50,7 @@ class TaskOutletDatasetReferencePydantic(BaseModelPydantic): updated_at = datetime class Config: - """Make sure it deals automatically with ORM classes of SQL Alchemy.""" + """Make sure it deals automatically with SQLAlchemy ORM classes.""" orm_mode = True @@ -69,7 +69,7 @@ class DatasetPydantic(BaseModelPydantic): producing_tasks: List[TaskOutletDatasetReferencePydantic] class Config: - """Make sure it deals automatically with ORM classes of SQL Alchemy.""" + """Make sure it deals automatically with SQLAlchemy ORM classes.""" orm_mode = True @@ -87,6 +87,6 @@ class DatasetEventPydantic(BaseModelPydantic): dataset: DatasetPydantic class Config: - """Make sure it deals automatically with ORM classes of SQL Alchemy.""" + """Make sure it deals automatically with SQLAlchemy ORM classes.""" orm_mode = True diff --git a/airflow/serialization/pydantic/job.py b/airflow/serialization/pydantic/job.py index 99253e3b7af79..b36a9826eb25f 100644 --- a/airflow/serialization/pydantic/job.py +++ b/airflow/serialization/pydantic/job.py @@ -47,6 +47,6 @@ class JobPydantic(BaseModelPydantic): max_tis_per_query: Optional[int] class Config: - """Make sure it deals automatically with ORM classes of SQL Alchemy.""" + """Make sure it deals automatically with SQLAlchemy ORM classes.""" orm_mode = True diff --git a/airflow/serialization/pydantic/taskinstance.py b/airflow/serialization/pydantic/taskinstance.py index 92a282b6bd397..236c42c26061b 100644 --- a/airflow/serialization/pydantic/taskinstance.py +++ b/airflow/serialization/pydantic/taskinstance.py @@ -58,7 +58,7 @@ class TaskInstancePydantic(BaseModelPydantic): run_as_user: Optional[str] class Config: - """Make sure it deals automatically with ORM classes of SQL Alchemy.""" + """Make sure it deals automatically with SQLAlchemy ORM classes.""" orm_mode = True diff --git a/airflow/serialization/serializers/kubernetes.py b/airflow/serialization/serializers/kubernetes.py index 900d219da5bdb..d38836affc4a8 100644 --- a/airflow/serialization/serializers/kubernetes.py +++ b/airflow/serialization/serializers/kubernetes.py @@ -46,11 +46,9 @@ def serialize(o: object) -> tuple[U, str, int, bool]: if isinstance(o, (k8s.V1Pod, k8s.V1ResourceRequirements)): from airflow.kubernetes.pod_generator import PodGenerator + # We're running this in an except block, so we don't want it to fail + # under any circumstances, e.g. accessing a non-existing attribute. def safe_get_name(pod): - """ - We're running this in an except block, so we don't want it to - fail under any circumstances, e.g. by accessing an attribute that isn't there. - """ try: return pod.metadata.name except Exception: diff --git a/airflow/utils/cli_action_loggers.py b/airflow/utils/cli_action_loggers.py index 8962e352215ae..02c311d40ab37 100644 --- a/airflow/utils/cli_action_loggers.py +++ b/airflow/utils/cli_action_loggers.py @@ -27,8 +27,7 @@ def register_pre_exec_callback(action_logger): - """ - Registers more action_logger function callback for pre-execution. + """Registers more action_logger function callback for pre-execution. This function callback is expected to be called with keyword args. For more about the arguments that is being passed to the callback, @@ -42,8 +41,7 @@ def register_pre_exec_callback(action_logger): def register_post_exec_callback(action_logger): - """ - Registers more action_logger function callback for post-execution. + """Registers more action_logger function callback for post-execution. This function callback is expected to be called with keyword args. For more about the arguments that is being passed to the callback, @@ -57,8 +55,8 @@ def register_post_exec_callback(action_logger): def on_pre_execution(**kwargs): - """ - Calls callbacks before execution. + """Calls callbacks before execution. + Note that any exception from callback will be logged but won't be propagated. :param kwargs: @@ -73,8 +71,8 @@ def on_pre_execution(**kwargs): def on_post_execution(**kwargs): - """ - Calls callbacks after execution. + """Calls callbacks after execution. + As it's being called after execution, it can capture status of execution, duration, etc. Note that any exception from callback will be logged but won't be propagated. @@ -91,13 +89,10 @@ def on_post_execution(**kwargs): def default_action_log(sub_command, user, task_id, dag_id, execution_date, host_name, full_command, **_): - """ - A default action logger callback that behave same as www.utils.action_logging - which uses global session and pushes log ORM object. + """Default action logger callback that behaves similar to ``action_logging``. - :param log: An log ORM instance - :param **_: other keyword arguments that is not being used by this function - :return: None + The difference is this function uses the global ORM session, and pushes a + ``Log`` row into the database instead of actually logging. """ from sqlalchemy.exc import OperationalError, ProgrammingError diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index 820ace78b2a48..a4f70c8dfec05 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -19,11 +19,13 @@ import warnings from datetime import datetime, timedelta +from typing import Collection from croniter import croniter from dateutil.relativedelta import relativedelta # for doctest from airflow.exceptions import RemovedInAirflow3Warning +from airflow.typing_compat import Literal from airflow.utils import timezone cron_presets: dict[str, str] = { @@ -42,10 +44,7 @@ def date_range( num: int | None = None, delta: str | timedelta | relativedelta | None = None, ) -> list[datetime]: - """ - Get a set of dates as a list based on a start, end and delta, delta - can be something that can be added to `datetime.datetime` - or a cron expression as a `str`. + """Get a list of dates in the specified range, separated by delta. .. code-block:: pycon >>> from airflow.utils.dates import date_range @@ -136,11 +135,12 @@ def date_range( return sorted(dates) -def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)): - """ - Returns the datetime of the form start_date + i * delta - which is closest to dt for any non-negative integer i. - Note that delta may be a datetime.timedelta or a dateutil.relativedelta. +def round_time( + dt: datetime, + delta: str | timedelta | relativedelta, + start_date: datetime = timezone.make_aware(datetime.min), +): + """Returns ``start_date + i * delta`` for given ``i`` where the result is closest to ``dt``. .. code-block:: pycon @@ -219,11 +219,13 @@ def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)): # and this function returns start_date. -def infer_time_unit(time_seconds_arr): - """ - Determine the most appropriate time unit for an array of time durations - specified in seconds. - e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours'. +TimeUnit = Literal["days", "hours", "minutes", "seconds"] + + +def infer_time_unit(time_seconds_arr: Collection[float]) -> TimeUnit: + """Determine the most appropriate time unit for given durations (in seconds). + + e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours' """ if len(time_seconds_arr) == 0: return "hours" @@ -238,7 +240,7 @@ def infer_time_unit(time_seconds_arr): return "days" -def scale_time_units(time_seconds_arr, unit): +def scale_time_units(time_seconds_arr: Collection[float], unit: TimeUnit) -> Collection[float]: """Convert an array of time durations in seconds to the specified time unit.""" if unit == "minutes": return list(map(lambda x: x / 60, time_seconds_arr)) @@ -250,9 +252,9 @@ def scale_time_units(time_seconds_arr, unit): def days_ago(n, hour=0, minute=0, second=0, microsecond=0): - """ - Get a datetime object representing `n` days ago. By default the time is - set to midnight. + """Get a datetime object representing *n* days ago. + + By default the time is set to midnight. """ warnings.warn( "Function `days_ago` is deprecated and will be removed in Airflow 3.0. " diff --git a/airflow/utils/log/timezone_aware.py b/airflow/utils/log/timezone_aware.py index ac2d88f020105..999ccda5a722a 100644 --- a/airflow/utils/log/timezone_aware.py +++ b/airflow/utils/log/timezone_aware.py @@ -22,12 +22,11 @@ class TimezoneAware(logging.Formatter): - """ - Override `default_time_format`, `default_msec_format` and `formatTime` to specify utc offset. - utc offset is the matter, without it, time conversion could be wrong. - With this Formatter, `%(asctime)s` will be formatted containing utc offset. (ISO 8601). + """Override time-formatting methods to include UTC offset. - e.g. 2022-06-12T13:00:00.123+0000 + Since Airflow parses the logs to perform time conversion, UTC offset is + critical information. This formatter ensures ``%(asctime)s`` is formatted + containing the offset in ISO 8601, e.g. ``2022-06-12T13:00:00.123+0000``. """ default_time_format = "%Y-%m-%dT%H:%M:%S" @@ -35,9 +34,10 @@ class TimezoneAware(logging.Formatter): default_tz_format = "%z" def formatTime(self, record, datefmt=None): - """ - Returns the creation time of the specified LogRecord in ISO 8601 date and time format - in the local time zone. + """Format time in record. + + This returns the creation time of the specified LogRecord in ISO 8601 + date and time format in the local time zone. """ dt = pendulum.from_timestamp(record.created, tz=pendulum.local_timezone()) if datefmt: diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py index 50dc669efeb8e..663f0f496c7bb 100644 --- a/airflow/utils/process_utils.py +++ b/airflow/utils/process_utils.py @@ -324,11 +324,13 @@ def check_if_pidfile_process_is_running(pid_file: str, process_name: str): def set_new_process_group() -> None: - """ - Try to set current process to a new process group. + """Try to set current process to a new process group. + That makes it easy to kill all sub-process of this at the OS-level, rather than having to iterate the child processes. - If current process spawn by system call ``exec()`` than keep current process group. + + If current process was spawned by system call ``exec()``, the current + process group is kept. """ if os.getpid() == os.getsid(0): # If PID = SID than process a session leader, and it is not possible to change process group diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index 2e47290f6ecff..f12f6f44c81b0 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -396,10 +396,11 @@ def nowait(session: Session) -> dict[str, Any]: def nulls_first(col, session: Session) -> dict[str, Any]: - """ - Adds a nullsfirst construct to the column ordering. Currently only Postgres supports it. - In MySQL & Sqlite NULL values are considered lower than any non-NULL value, therefore, NULL values - appear first when the order is ASC (ascending). + """Specify *NULLS FIRST* to the column ordering. + + This is only done to Postgres, currently the only backend that supports it. + Other databases do not need it since NULL values are considered lower than + any other values, and appear first when the order is ASC (ascending). """ if session.bind.dialect.name == "postgresql": return nullsfirst(col) diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index 5d134697f9341..2b65055f38d5b 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -29,33 +29,31 @@ def is_localized(value): - """ - Determine if a given datetime.datetime is aware. - The concept is defined in Python's docs: - http://docs.python.org/library/datetime.html#datetime.tzinfo - Assuming value.tzinfo is either None or a proper datetime.tzinfo, - value.utcoffset() implements the appropriate logic. + """Determine if a given datetime.datetime is aware. + + The concept is defined in Python documentation. Assuming the tzinfo is + either None or a proper ``datetime.tzinfo`` instance, ``value.utcoffset()`` + implements the appropriate logic. + + .. seealso:: http://docs.python.org/library/datetime.html#datetime.tzinfo """ return value.utcoffset() is not None def is_naive(value): - """ - Determine if a given datetime.datetime is naive. - The concept is defined in Python's docs: - http://docs.python.org/library/datetime.html#datetime.tzinfo - Assuming value.tzinfo is either None or a proper datetime.tzinfo, - value.utcoffset() implements the appropriate logic. + """Determine if a given datetime.datetime is naive. + + The concept is defined in Python documentation. Assuming the tzinfo is + either None or a proper ``datetime.tzinfo`` instance, ``value.utcoffset()`` + implements the appropriate logic. + + .. seealso:: http://docs.python.org/library/datetime.html#datetime.tzinfo """ return value.utcoffset() is None def utcnow() -> dt.datetime: - """ - Get the current date and time in UTC. - - :return: - """ + """Get the current date and time in UTC.""" # pendulum utcnow() is not used as that sets a TimezoneInfo object # instead of a Timezone. This is not picklable and also creates issues # when using replace() @@ -66,11 +64,7 @@ def utcnow() -> dt.datetime: def utc_epoch() -> dt.datetime: - """ - Gets the epoch in the users timezone. - - :return: - """ + """Gets the epoch in the users timezone.""" # pendulum utcnow() is not used as that sets a TimezoneInfo object # instead of a Timezone. This is not picklable and also creates issues # when using replace() @@ -91,9 +85,7 @@ def convert_to_utc(value: dt.datetime) -> DateTime: def convert_to_utc(value: dt.datetime | None) -> DateTime | None: - """ - Returns the datetime with the default timezone added if timezone - information was not associated. + """Creates a datetime with the default timezone added if none is associated. :param value: datetime :return: datetime with tzinfo diff --git a/airflow/www/extensions/init_appbuilder.py b/airflow/www/extensions/init_appbuilder.py index b1c70daf47192..ac9d2c9107df4 100644 --- a/airflow/www/extensions/init_appbuilder.py +++ b/airflow/www/extensions/init_appbuilder.py @@ -408,7 +408,9 @@ def add_view( then this link will be a part of the menu. Otherwise, it will not be included in the menu items. Defaults to :code:`None`, meaning the item will always be present. + Examples:: + appbuilder = AppBuilder(app, db) # Register a view, rendering a top menu without icon. appbuilder.add_view(MyModelView(), "My View") @@ -478,8 +480,8 @@ def add_link( baseview=None, cond=None, ): - """ - Add your own links to menu using this method + """Add your own links to menu using this method. + :param name: The string name that identifies the menu. :param href: @@ -524,8 +526,8 @@ def add_link( self._add_permissions_menu(category) def add_separator(self, category, cond=None): - """ - Add a separator to the menu, you will sequentially create the menu + """Add a separator to the menu, you will sequentially create the menu. + :param category: The menu category where the separator will be included. :param cond: @@ -558,28 +560,29 @@ def add_view_no_menu(self, baseview, endpoint=None, static_folder=None): return baseview def security_cleanup(self): - """ - This method is useful if you have changed - the name of your menus or classes, - changing them will leave behind permissions - that are not associated with anything. - You can use it always or just sometimes to - perform a security cleanup. Warning this will delete any permission - that is no longer part of any registered view or menu. - Remember invoke ONLY AFTER YOU HAVE REGISTERED ALL VIEWS. + """Clean up security. + + This method is useful if you have changed the name of your menus or + classes. Changing them leaves behind permissions that are not associated + with anything. You can use it always or just sometimes to perform a + security cleanup. + + .. warning:: + + This deletes any permission that is no longer part of any registered + view or menu. Only invoke AFTER YOU HAVE REGISTERED ALL VIEWS. """ self.sm.security_cleanup(self.baseviews, self.menu) def security_converge(self, dry=False) -> dict: - """ - Migrates all permissions to the new names on all the Roles. + """Migrates all permissions to the new names on all the Roles. This method is useful when you use: - - `class_permission_name` - - `previous_class_permission_name` - - `method_permission_name` - - `previous_method_permission_name` + - ``class_permission_name`` + - ``previous_class_permission_name`` + - ``method_permission_name`` + - ``previous_method_permission_name`` :param dry: If True will not change DB :return: Dict with all computed necessary operations diff --git a/airflow/www/extensions/init_manifest_files.py b/airflow/www/extensions/init_manifest_files.py index 79b244e3b61f4..2ce60194a60a9 100644 --- a/airflow/www/extensions/init_manifest_files.py +++ b/airflow/www/extensions/init_manifest_files.py @@ -24,11 +24,9 @@ def configure_manifest_files(app): - """ - Loads the manifest file and register the `url_for_asset_` template tag. + """Loads the manifest file and register the `url_for_asset_` template tag. :param app: - :return: """ manifest = {} @@ -52,11 +50,10 @@ def get_asset_url(filename): @app.context_processor def get_url_for_asset(): - """ - Template tag to return the asset URL. - WebPack renders the assets after minification and modification - under the static/dist folder. - This template tag reads the asset name in manifest.json and returns - the appropriate file. + """Template tag to return the asset URL. + + WebPack renders the assets after minification and modification under the + static/dist folder. This template tag reads the asset name in + ``manifest.json`` and returns the appropriate file. """ return dict(url_for_asset=get_asset_url) diff --git a/airflow/www/fab_security/manager.py b/airflow/www/fab_security/manager.py index 380e3ef9e3033..a26b5977fbcbb 100644 --- a/airflow/www/fab_security/manager.py +++ b/airflow/www/fab_security/manager.py @@ -146,6 +146,7 @@ class BaseSecurityManager: @staticmethod def oauth_tokengetter(token=None): """Authentication (OAuth) token getter function. + Override to implement your own token getter method. """ return _oauth_tokengetter(token) @@ -585,9 +586,10 @@ def get_oauth_token_key_name(self, provider): return _provider.get("token_key", "oauth_token") def get_oauth_token_secret_name(self, provider): - """ - Returns the token_secret name for the oauth provider if none is configured defaults to oauth_secret. - This is configured using OAUTH_PROVIDERS and token_secret. + """Gety the ``token_secret`` name for the oauth provider. + + If none is configured, defaults to ``oauth_secret``. This is configured + using ``OAUTH_PROVIDERS`` and ``token_secret``. """ for _provider in self.oauth_providers: if _provider["name"] == provider: @@ -606,9 +608,9 @@ def set_oauth_session(self, provider, oauth_response): session["oauth_provider"] = provider def get_oauth_user_info(self, provider, resp): - """ - Since there are different OAuth API's with different ways to - retrieve user info. + """Get the OAuth user information from different OAuth APIs. + + All providers have different ways to retrieve user info. """ # for GITHUB if provider == "github" or provider == "githublocal": @@ -866,9 +868,10 @@ def reset_password(self, userid, password): self.update_user(user) def update_user_auth_stat(self, user, success=True): - """ - Update user authentication stats upon successful/unsuccessful - authentication attempts. + """Update user authentication stats. + + This is done upon successful/unsuccessful authentication attempts. + :param user: The identified (but possibly not successfully authenticated) user model @@ -890,9 +893,10 @@ def update_user_auth_stat(self, user, success=True): self.update_user(user) def _rotate_session_id(self): - """ - Upon successful authentication when using the database session backend, - we need to rotate the session id. + """Rotate the session ID. + + We need to do this upon successful authentication when using the + database session backend. """ if conf.get("webserver", "SESSION_BACKEND") == "database": session.sid = str(uuid4()) @@ -1379,10 +1383,10 @@ def _has_access_builtin_roles(self, role, action_name: str, resource_name: str) def _get_user_permission_resources( self, user: User | None, action_name: str, resource_names: list[str] | None = None ) -> set[str]: - """ - Return a set of resource names with a certain action name that a user has access to. - Mainly used to fetch all menu permissions on a single db call, will also check public permissions - and builtin roles. + """Get resource names with a certain action name that a user has access to. + + Mainly used to fetch all menu permissions on a single db call, will also + check public permissions and builtin roles """ if not resource_names: resource_names = [] diff --git a/dev/breeze/src/airflow_breeze/utils/confirm.py b/dev/breeze/src/airflow_breeze/utils/confirm.py index 0b64697c19f97..225a04bce2dca 100644 --- a/dev/breeze/src/airflow_breeze/utils/confirm.py +++ b/dev/breeze/src/airflow_breeze/utils/confirm.py @@ -37,14 +37,12 @@ def user_confirm( default_answer: Answer | None = Answer.NO, quit_allowed: bool = True, ) -> Answer: - """ - Ask the user for confirmation. + """Ask the user for confirmation. :param message: message to display to the user (should end with the question mark) :param timeout: time given user to answer :param default_answer: default value returned on timeout. If no default - is set, the timeout is ignored. :param quit_allowed: whether quit answer is allowed - :return: """ from inputimeout import TimeoutOccurred, inputimeout diff --git a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py index 2218a266b2954..6bcb0bdb93876 100644 --- a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py +++ b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py @@ -262,11 +262,11 @@ def check_docker_version(): def check_remote_ghcr_io_commands(): - """ - Checks if you have permissions to pull an empty image from ghcr.io. Unfortunately, GitHub packages - treat expired login as "no-access" even on public repos. We need to detect that situation and suggest - user to log-out or if they are in CI environment to re-push their PR/close or reopen the PR. - :return: + """Checks if you have permissions to pull an empty image from ghcr.io. + + Unfortunately, GitHub packages treat expired login as "no-access" even on + public repos. We need to detect that situation and suggest user to log-out + or if they are in CI environment to re-push their PR/close or reopen the PR. """ response = run_command( ["docker", "pull", "ghcr.io/apache/airflow-hello-world"], @@ -305,12 +305,12 @@ def check_remote_ghcr_io_commands(): def check_docker_compose_version(): - """ - Checks if the docker compose version is as expected, including some specific modifications done by - some vendors such as Microsoft. They might have modified version of docker-compose/docker in their - cloud. In case docker compose version is wrong we continue but print warning for the user. - + """Checks if the docker compose version is as expected. + This includes specific modifications done by some vendors such as Microsoft. + They might have modified version of docker-compose/docker in their cloud. In + the case the docker compose version is wrong, we continue but print a + warning for the user. """ version_pattern = re.compile(r"(\d+)\.(\d+)\.(\d+)") docker_compose_version_command = ["docker-compose", "--version"] @@ -363,10 +363,7 @@ def check_docker_compose_version(): def check_docker_context(): - """ - Checks whether Docker is using the expected context - - """ + """Checks whether Docker is using the expected context.""" expected_docker_context = "default" response = run_command( ["docker", "info", "--format", "{{json .ClientInfo.Context}}"], @@ -576,12 +573,11 @@ def make_sure_builder_configured(params: CommonBuildParams): def set_value_to_default_if_not_set(env: dict[str, str], name: str, default: str): - """ - Set value of name parameter to default (indexed by name) if not set. + """Set value of name parameter to default (indexed by name) if not set. + :param env: dictionary where to set the parameter :param name: name of parameter :param default: default value - :return: """ if env.get(name) is None: env[name] = os.environ.get(name, default) diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py index 5f994e943292b..a2b7787f490ca 100755 --- a/dev/provider_packages/prepare_provider_packages.py +++ b/dev/provider_packages/prepare_provider_packages.py @@ -623,13 +623,15 @@ def strip_leading_zeros(version: str) -> str: def get_previous_release_info( previous_release_version: str | None, past_releases: list[ReleaseInfo], current_release_version: str ) -> str | None: - """ - Find previous release. In case we are re-running current release we assume that last release was - the previous one. This is needed so that we can generate list of changes since the previous release. + """Find previous release. + + In case we are re-running current release, we assume that last release was + the previous one. This is needed so that we can generate list of changes + since the previous release. + :param previous_release_version: known last release version :param past_releases: list of past releases :param current_release_version: release that we are working on currently - :return: """ previous_release = None if previous_release_version == current_release_version: @@ -645,8 +647,8 @@ def check_if_release_version_ok( past_releases: list[ReleaseInfo], current_release_version: str, ) -> tuple[str, str | None]: - """ - Check if the release version passed is not later than the last release version + """Check if the release version passed is not later than the last release version. + :param past_releases: all past releases (if there are any) :param current_release_version: release version to check :return: Tuple of current/previous_release (previous might be None if there are no releases) @@ -668,8 +670,8 @@ def check_if_release_version_ok( def get_cross_provider_dependent_packages(provider_package_id: str) -> list[str]: - """ - Returns cross-provider dependencies for the package. + """Returns cross-provider dependencies for the package. + :param provider_package_id: package id :return: list of cross-provider dependencies """ @@ -677,18 +679,17 @@ def get_cross_provider_dependent_packages(provider_package_id: str) -> list[str] def make_current_directory_safe(verbose: bool): - """ - Makes current directory safe for Git. + """Makes current directory safe for Git. - New git checks if git ownership for the folder is not manipulated with. We are running this command - only inside the container where the directory is mounted from "regular" user to "root" user which is - used inside the container, so this is quite ok to assume the directory it is used is safe. + New git checks if git ownership for the folder is not manipulated with. We + are running this command only inside the container where the directory is + mounted from "regular" user to "root" user which is used inside the + container, so this is quite ok to assume the directory it is used is safe. - It's also ok to leave it as safe - it is a global option inside the container so it will disappear - when we exit. + It's also ok to leave it as safe - it is a global option inside the + container so it will disappear when we exit. :param verbose: whether to print commands being executed - :return: """ safe_dir_remove_command = ["git", "config", "--global", "--unset-all", "safe.directory"] if verbose: @@ -702,17 +703,17 @@ def make_current_directory_safe(verbose: bool): def make_sure_remote_apache_exists_and_fetch(git_update: bool, verbose: bool): - """ - Make sure that apache remote exist in git. We need to take a log from the apache - repository - not locally. + """Make sure that apache remote exist in git. - Also, the local repo might be shallow, so we need to un-shallow it. + We need to take a log from the apache repository - not locally. Also, the + local repo might be shallow, so we need to un-shallow it. This will: * mark current directory as safe for ownership (it is run in the container) * check if the remote exists and add if it does not * check if the local repo is shallow, mark it to un-shallow in this case - * fetch from the remote including all tags and overriding local tags in case they are set differently + * fetch from the remote including all tags and overriding local tags in case + they are set differently :param git_update: If the git remote already exists, should we try to update it :param verbose: print verbose messages while fetching @@ -782,8 +783,10 @@ def make_sure_remote_apache_exists_and_fetch(git_update: bool, verbose: bool): def get_git_log_command( verbose: bool, from_commit: str | None = None, to_commit: str | None = None ) -> list[str]: - """ - Get git command to run for the current repo from the current folder (which is the package folder). + """Get git command to run for the current repo from the current folder. + + The current directory should always be the package folder. + :param verbose: whether to print verbose info while getting the command :param from_commit: if present - base commit from which to start the log from :param to_commit: if present - final commit which should be the start of the log @@ -806,8 +809,8 @@ def get_git_log_command( def get_git_tag_check_command(tag: str) -> list[str]: - """ - Get git command to check if tag exits. + """Get git command to check if tag exits. + :param tag: Tag to check :return: git command to run """ @@ -819,8 +822,8 @@ def get_git_tag_check_command(tag: str) -> list[str]: def get_source_package_path(provider_package_id: str) -> str: - """ - Retrieves source package path from package id. + """Retrieves source package path from package id. + :param provider_package_id: id of the package :return: path of the providers folder """ @@ -828,8 +831,8 @@ def get_source_package_path(provider_package_id: str) -> str: def get_documentation_package_path(provider_package_id: str) -> str: - """ - Retrieves documentation package path from package id. + """Retrieves documentation package path from package id. + :param provider_package_id: id of the package :return: path of the documentation folder """ @@ -839,8 +842,8 @@ def get_documentation_package_path(provider_package_id: str) -> str: def get_generated_package_path(provider_package_id: str) -> str: - """ - Retrieves generated package path from package id. + """Retrieves generated package path from package id. + :param provider_package_id: id of the package :return: path of the providers folder """ @@ -849,8 +852,7 @@ def get_generated_package_path(provider_package_id: str) -> str: def get_additional_package_info(provider_package_path: str) -> str: - """ - Returns additional info for the package. + """Returns additional info for the package. :param provider_package_path: path for the package :return: additional information for the path (empty string if missing) @@ -878,10 +880,10 @@ def get_package_pip_name(provider_package_id: str): def validate_provider_info_with_runtime_schema(provider_info: dict[str, Any]) -> None: - """ - Validates provider info against the runtime schema. This way we check if the provider info in the - packages is future-compatible. The Runtime Schema should only change when there is a major version - change. + """Validates provider info against the runtime schema. + + This way we check if the provider info in the packages is future-compatible. + The Runtime Schema should only change when there is a major version change. :param provider_info: provider info to validate """ @@ -900,10 +902,13 @@ def validate_provider_info_with_runtime_schema(provider_info: dict[str, Any]) -> def get_provider_yaml(provider_package_id: str) -> dict[str, Any]: - """ - Retrieves provider info from the provider yaml file. The provider yaml file contains more information - than provider_info that is used at runtime. This method converts the full provider yaml file into - stripped-down provider info and validates it against deprecated 2.0.0 schema and runtime schema. + """Retrieves provider info from the provider YAML file. + + The provider yaml file contains more information than provider_info that is + used at runtime. This method converts the full provider yaml file into + stripped-down provider info and validates it against deprecated 2.0.0 schema + and runtime schema. + :param provider_package_id: package id to retrieve provider.yaml from :return: provider_info dictionary """ @@ -916,8 +921,8 @@ def get_provider_yaml(provider_package_id: str) -> dict[str, Any]: def get_provider_info_from_provider_yaml(provider_package_id: str) -> dict[str, Any]: - """ - Retrieves provider info from the provider yaml file. + """Retrieves provider info from the provider yaml file. + :param provider_package_id: package id to retrieve provider.yaml from :return: provider_info dictionary """ @@ -942,12 +947,11 @@ def get_all_changes_for_package( verbose: bool, base_branch: str, ) -> tuple[bool, list[list[Change]] | Change | None, str]: - """ - Retrieves all changes for the package. + """Retrieves all changes for the package. + :param provider_package_id: provider package id :param base_branch: base branch to check changes in apache remote for changes :param verbose: whether to print verbose messages - """ provider_details = get_provider_details(provider_package_id) current_version = provider_details.versions[0] @@ -1171,11 +1175,12 @@ def prepare_readme_file(context): def confirm(message: str, answer: str | None = None) -> bool: - """ - Ask user to confirm (case-insensitive). + """Ask user to confirm (case-insensitive). + :param message: message to display :param answer: force answer if set - :return: True if the answer is any form of y/yes. Exits with 65 exit code if any form of q/quit is chosen. + :return: True if the answer is any form of y/yes. Exits with 65 exit code if + any form of q/quit is chosen. """ given_answer = answer.lower() if answer is not None else "" while given_answer not in ["y", "n", "q", "yes", "no", "quit"]: @@ -1199,8 +1204,8 @@ class TypeOfChange(Enum): def get_type_of_changes(answer: str | None) -> TypeOfChange: - """ - Ask user to specify type of changes (case-insensitive). + """Ask user to specify type of changes (case-insensitive). + :return: Type of change. """ given_answer = "" @@ -1271,8 +1276,9 @@ def update_release_notes( answer: str | None, base_branch: str, ) -> bool: - """ - Updates generated files (readme, changes and/or setup.cfg/setup.py/manifest.in/provider_info) + """Updates generated files. + + This includes the readme, changes, and/or setup.cfg/setup.py/manifest.in/provider_info). :param provider_package_id: id of the package :param version_suffix: version suffix corresponding to the version in the code @@ -1352,8 +1358,7 @@ def update_setup_files( provider_package_id: str, version_suffix: str, ): - """ - Updates generated setup.cfg/setup.py/manifest.in/provider_info for packages + """Updates generated setup.cfg/setup.py/manifest.in/provider_info for packages. :param provider_package_id: id of the package :param version_suffix: version suffix corresponding to the version in the code @@ -1545,18 +1550,17 @@ def prepare_manifest_in_file(context): def get_all_providers() -> list[str]: - """ - Returns all providers for regular packages. + """Returns all providers for regular packages. + :return: list of providers that are considered for provider packages """ return list(ALL_PROVIDERS) def verify_provider_package(provider_package_id: str) -> None: - """ - Verifies if the provider package is good. + """Verifies if the provider package is good. + :param provider_package_id: package id to verify - :return: None """ if provider_package_id not in get_all_providers(): console.print(f"[red]Wrong package name: {provider_package_id}[/]") @@ -1618,10 +1622,9 @@ def update_package_documentation( verbose: bool, base_branch: str, ): - """ - Updates package documentation. + """Updates package documentation. - See `list-providers-packages` subcommand for the possible PACKAGE_ID values + See `list-providers-packages` subcommand for the possible PACKAGE_ID values. """ provider_package_id = package_id verify_provider_package(provider_package_id) @@ -1670,10 +1673,9 @@ def tag_exists_for_version(provider_package_id: str, current_tag: str, verbose: def generate_setup_files( version_suffix: str, git_update: bool, package_id: str, verbose: bool, skip_tag_check: bool ): - """ - Generates setup files for the package. + """Generates setup files for the package. - See `list-providers-packages` subcommand for the possible PACKAGE_ID values + See `list-providers-packages` subcommand for the possible PACKAGE_ID values. """ provider_package_id = package_id with with_group(f"Generate setup files for '{provider_package_id}'"): @@ -1740,10 +1742,9 @@ def build_provider_packages( verbose: bool, skip_tag_check: bool, ): - """ - Builds provider package. + """Builds provider package. - See `list-providers-packages` subcommand for the possible PACKAGE_ID values + See `list-providers-packages` subcommand for the possible PACKAGE_ID values. """ import tempfile @@ -1792,13 +1793,14 @@ def build_provider_packages( def find_insertion_index_for_version(content: list[str], version: str) -> tuple[int, bool]: - """ - Finds insertion index for the specified version from the .rst changelog content. + """Finds insertion index for the specified version from the .rst changelog content. :param content: changelog split into separate lines :param version: version to look for - :return: Tuple : insertion_index, append (whether to append or insert the changelog) + :return: A 2-tuple. The first item indicates the insertion index, while the + second is a boolean indicating whether to append (False) or insert (True) + to the changelog. """ changelog_found = False skip_next_line = False @@ -1824,10 +1826,11 @@ class ClassifiedChanges(NamedTuple): def get_changes_classified(changes: list[Change]) -> ClassifiedChanges: - """ - Pre-classifies changes based on commit message, it's wildly guessing now, - but if we switch to semantic commits, it could be automated. This list is supposed to be manually - reviewed and re-classified by release manager anyway. + """Pre-classifies changes based on commit message, it's wildly guessing now, + + However, if we switch to semantic commits, it could be automated. This list + is supposed to be manually reviewed and re-classified by release manager + anyway. :param changes: list of changes :return: list of changes classified semi-automatically to the fix/feature/breaking/other buckets @@ -1856,8 +1859,8 @@ def update_changelog(package_id: str, base_branch: str, verbose: bool): def _update_changelog(package_id: str, base_branch: str, verbose: bool) -> bool: - """ - Internal update changelog method + """Internal update changelog method. + :param package_id: package id :param base_branch: base branch to check changes in apache remote for changes :param verbose: verbose flag diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst index 45fcb1eaca8e4..0b7ef9a472e24 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst @@ -101,7 +101,7 @@ the example below. .. code-block:: bash $ airflow info - ... + Apache Airflow version | 2.7.0.dev0 executor | LocalExecutor diff --git a/docs/exts/docs_build/code_utils.py b/docs/exts/docs_build/code_utils.py index a7e3a5a666ff2..1f520553b84b6 100644 --- a/docs/exts/docs_build/code_utils.py +++ b/docs/exts/docs_build/code_utils.py @@ -36,12 +36,11 @@ def prepare_code_snippet(file_path: str, line_no: int, context_lines_count: int = 5) -> str: - """ - Prepares code snippet. + """Prepares code snippet. + :param file_path: file path :param line_no: line number :param context_lines_count: number of lines of context. - :return: """ def guess_lexer_for_filename(filename): diff --git a/scripts/in_container/verify_providers.py b/scripts/in_container/verify_providers.py index ab0a977aa6c96..5a5838351b4c1 100755 --- a/scripts/in_container/verify_providers.py +++ b/scripts/in_container/verify_providers.py @@ -208,8 +208,8 @@ def filter_known_common_deprecated_messages(warn: warnings.WarningMessage) -> bo def get_all_providers() -> list[str]: - """ - Returns all providers for regular packages. + """Returns all providers for regular packages. + :return: list of providers that are considered for provider packages """ from setup import ALL_PROVIDERS @@ -224,13 +224,13 @@ def import_all_classes( print_imports: bool = False, print_skips: bool = False, ) -> tuple[list[str], list[WarningMessage], list[str]]: - """ - Imports all classes in providers packages. This method loads and imports - all the classes found in providers, so that we can find all the subclasses - of operators/sensors etc. + """Imports all classes in providers packages. + + This method loads and imports all the classes found in providers, so that we + can find all the subclasses of operators/sensors etc. - :param walkable_paths_and_prefixes: dict of paths with accompanying prefixes to look the provider - packages in + :param walkable_paths_and_prefixes: dict of paths with accompanying prefixes + to look the provider packages in :param prefix: prefix to add :param provider_ids - provider ids that should be loaded. :param print_imports - if imported class should also be printed in output @@ -330,8 +330,7 @@ def onerror(_): def is_imported_from_same_module(the_class: str, imported_name: str) -> bool: - """ - Is the class imported from another module? + """Is the class imported from another module? :param the_class: the class object itself :param imported_name: name of the imported class @@ -341,8 +340,7 @@ def is_imported_from_same_module(the_class: str, imported_name: str) -> bool: def is_example_dag(imported_name: str) -> bool: - """ - Is the class an example_dag class? + """Is the class an example_dag class? :param imported_name: name where the class is imported from :return: true if it is an example_dags class @@ -351,18 +349,17 @@ def is_example_dag(imported_name: str) -> bool: def is_from_the_expected_base_package(the_class: type, expected_package: str) -> bool: - """ - Returns true if the class is from the package expected. + """Returns true if the class is from the package expected. + :param the_class: the class object :param expected_package: package expected for the class - :return: """ return the_class.__module__.startswith(expected_package) def inherits_from(the_class: type, expected_ancestor: type | None = None) -> bool: - """ - Returns true if the class inherits (directly or indirectly) from the class specified. + """Returns true if the class inherits (directly or indirectly) from the class specified. + :param the_class: The class to check :param expected_ancestor: expected class to inherit from :return: true is the class inherits from the class expected @@ -376,8 +373,8 @@ def inherits_from(the_class: type, expected_ancestor: type | None = None) -> boo def is_class(the_class: type) -> bool: - """ - Returns true if the object passed is a class + """Returns true if the object passed is a class. + :param the_class: the class to pass :return: true if it is a class """ @@ -387,9 +384,8 @@ def is_class(the_class: type) -> bool: def package_name_matches(the_class: type, expected_pattern: str | None = None) -> bool: - """ - In case expected_pattern is set, it checks if the package name matches the pattern. - . + """In case expected_pattern is set, it checks if the package name matches the pattern. + :param the_class: imported class :param expected_pattern: the pattern that should match the package :return: true if the expected_pattern is None or the pattern matches the package @@ -398,8 +394,7 @@ def package_name_matches(the_class: type, expected_pattern: str | None = None) - def convert_classes_to_table(entity_type: EntityType, entities: list[str], full_package_name: str) -> str: - """ - Converts new entities to a Markdown table. + """Converts new entities to a Markdown table. :param entity_type: entity type to convert to markup :param entities: list of entities @@ -419,14 +414,12 @@ def get_details_about_classes( wrong_entities: list[tuple[type, str]], full_package_name: str, ) -> EntityTypeSummary: - """ - Get details about entities. + """Get details about entities. :param entity_type: type of entity (Operators, Hooks etc.) :param entities: set of entities found :param wrong_entities: wrong entities found for that type :param full_package_name: full package name - :return: """ all_entities = list(entities) all_entities.sort() @@ -443,9 +436,7 @@ def get_details_about_classes( def strip_package_from_class(base_package: str, class_name: str) -> str: - """ - Strips base package name from the class (if it starts with the package name). - """ + """Strips base package name from the class (if it starts with the package name).""" if class_name.startswith(base_package): return class_name[len(base_package) + 1 :] else: @@ -453,8 +444,7 @@ def strip_package_from_class(base_package: str, class_name: str) -> str: def convert_class_name_to_url(base_url: str, class_name) -> str: - """ - Converts the class name to URL that the class can be reached + """Converts the class name to URL that the class can be reached. :param base_url: base URL to use :param class_name: name of the class @@ -464,8 +454,7 @@ def convert_class_name_to_url(base_url: str, class_name) -> str: def get_class_code_link(base_package: str, class_name: str, git_tag: str) -> str: - """ - Provides a Markdown link for the class passed as parameter. + """Provides a Markdown link for the class passed as parameter. :param base_package: base package to strip from most names :param class_name: name of the class @@ -480,8 +469,8 @@ def get_class_code_link(base_package: str, class_name: str, git_tag: str) -> str def print_wrong_naming(entity_type: EntityType, wrong_classes: list[tuple[type, str]]): - """ - Prints wrong entities of a given entity type if there are any + """Prints wrong entities of a given entity type if there are any. + :param entity_type: type of the class to print :param wrong_classes: list of wrong entities """ @@ -501,8 +490,7 @@ def find_all_entities( exclude_class_type: type | None = None, false_positive_class_names: set[str] | None = None, ) -> VerifiedEntities: - """ - Returns set of entities containing all subclasses in package specified. + """Returns set of entities containing all subclasses in package specified. :param imported_classes: entities imported from providers :param base_package: base package name where to start looking for the entities @@ -557,11 +545,12 @@ def find_all_entities( def get_package_class_summary( full_package_name: str, imported_classes: list[str] ) -> dict[EntityType, EntityTypeSummary]: - """ - Gets summary of the package in the form of dictionary containing all types of entities + """Gets summary of the package in the form of dictionary containing all types of entities. + :param full_package_name: full package name :param imported_classes: entities imported_from providers - :return: dictionary of objects usable as context for JINJA2 templates - or None if there are some errors + :return: dictionary of objects usable as context for JINJA2 templates, or + None if there are some errors """ from airflow.hooks.base import BaseHook from airflow.models.baseoperator import BaseOperator @@ -637,8 +626,8 @@ def get_package_class_summary( def is_camel_case_with_acronyms(s: str): - """ - Checks if the string passed is Camel Case (with capitalised acronyms allowed). + """Checks if the string passed is Camel Case (with capitalised acronyms allowed). + :param s: string to check :return: true if the name looks cool as Class name. """ @@ -648,9 +637,9 @@ def is_camel_case_with_acronyms(s: str): def check_if_classes_are_properly_named( entity_summary: dict[EntityType, EntityTypeSummary] ) -> tuple[int, int]: - """ - Check if all entities in the dictionary are named properly. It prints names at the output - and returns the status of class names. + """Check if all entities in the dictionary are named properly. + + It prints names at the output and returns the status of class names. :param entity_summary: dictionary of class names to check, grouped by types. :return: Tuple of 2 ints = total number of entities and number of badly named entities @@ -773,9 +762,10 @@ def get_providers_paths() -> list[str]: def add_all_namespaced_packages( walkable_paths_and_prefixes: dict[str, str], provider_path: str, provider_prefix: str ): - """ - We need to find namespace packages ourselves as "walk_packages" does not support namespaced packages - # and PEP420 + """Find namespace packages. + + This needs to be done manually as ``walk_packages`` does not support + namespaced packages and PEP 420. :param walkable_paths_and_prefixes: pats :param provider_path: diff --git a/tests/conftest.py b/tests/conftest.py index 071a6f77a90e0..ca0751f1be7d8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -63,9 +63,7 @@ @pytest.fixture() def reset_environment(): - """ - Resets env variables. - """ + """Resets env variables.""" init_env = os.environ.copy() yield changed_env = os.environ @@ -78,10 +76,7 @@ def reset_environment(): @pytest.fixture() def secret_key() -> str: - """ - Return secret key configured. - :return: - """ + """Return secret key configured.""" from airflow.configuration import conf the_key = conf.get("webserver", "SECRET_KEY") @@ -100,9 +95,7 @@ def url_safe_serializer(secret_key) -> URLSafeSerializer: @pytest.fixture() def reset_db(): - """ - Resets Airflow db. - """ + """Resets Airflow db.""" from airflow.utils import db @@ -115,9 +108,7 @@ def reset_db(): @pytest.fixture(autouse=True) def trace_sql(request): - """ - Displays queries from the tests to console. - """ + """Displays queries from the tests to console.""" trace_sql_option = request.config.getoption("trace_sql") if not trace_sql_option: yield @@ -157,9 +148,7 @@ def pytest_print(text): def pytest_addoption(parser): - """ - Add options parser for custom plugins - """ + """Add options parser for custom plugins.""" group = parser.getgroup("airflow") group.addoption( "--with-db-init", @@ -233,10 +222,7 @@ def initial_db_init(): @pytest.fixture(autouse=True, scope="session") def initialize_airflow_tests(request): - """ - Helper that setups Airflow testing environment. - """ - + """Helper that setups Airflow testing environment.""" print(" AIRFLOW ".center(60, "=")) # Setup test environment for breeze @@ -294,7 +280,7 @@ def pytest_configure(config): def pytest_unconfigure(config): - os.environ["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] + del os.environ["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] def skip_if_not_marked_with_integration(selected_integrations, item): @@ -423,21 +409,24 @@ def pytest_runtest_setup(item): @pytest.fixture def frozen_sleep(monkeypatch): - """ - Use time-machine to "stub" sleep, so that it takes no time, but that - ``datetime.now()`` appears to move forwards + """Use time-machine to "stub" sleep. - If your module under test does ``import time`` and then ``time.sleep``:: + This means the ``sleep()`` takes no time, but ``datetime.now()`` appears to move forwards. + + If your module under test does ``import time`` and then ``time.sleep``: + + .. code-block:: python def test_something(frozen_sleep): my_mod.fn_under_test() - If your module under test does ``from time import sleep`` then you will - have to mock that sleep function directly:: + have to mock that sleep function directly: + + .. code-block:: python def test_something(frozen_sleep, monkeypatch): - monkeypatch.setattr('my_mod.sleep', frozen_sleep) + monkeypatch.setattr("my_mod.sleep", frozen_sleep) my_mod.fn_under_test() """ traveller = None @@ -469,8 +458,7 @@ def app(): @pytest.fixture def dag_maker(request): - """ - The dag_maker helps us to create DAG, DagModel, and SerializedDAG automatically. + """Fixture to help create DAG, DagModel, and SerializedDAG automatically. You have to use the dag_maker as a context manager and it takes the same argument as DAG:: @@ -492,10 +480,11 @@ def dag_maker(request): The dag_maker.create_dagrun takes the same arguments as dag.create_dagrun - If you want to operate on serialized DAGs, then either pass ``serialized=True` to the ``dag_maker()`` - call, or you can mark your test/class/file with ``@pytest.mark.need_serialized_dag(True)``. In both of - these cases the ``dag`` returned by the context manager will be a lazily-evaluated proxy object to the - SerializedDAG. + If you want to operate on serialized DAGs, then either pass + ``serialized=True`` to the ``dag_maker()`` call, or you can mark your + test/class/file with ``@pytest.mark.need_serialized_dag(True)``. In both of + these cases the ``dag`` returned by the context manager will be a + lazily-evaluated proxy object to the SerializedDAG. """ import lazy_object_proxy @@ -703,8 +692,8 @@ def cleanup(self): @pytest.fixture def create_dummy_dag(dag_maker): - """ - This fixture creates a `DAG` with a single `EmptyOperator` task. + """Create a `DAG` with a single `EmptyOperator` task. + DagRun and DagModel is also created. Apart from the already existing arguments, any other argument in kwargs @@ -760,8 +749,7 @@ def create_dag( @pytest.fixture def create_task_instance(dag_maker, create_dummy_dag): - """ - Create a TaskInstance, and associated DB rows (DagRun, DagModel, etc) + """Create a TaskInstance, and associated DB rows (DagRun, DagModel, etc). Uses ``create_dummy_dag`` to create the dag structure. """ diff --git a/tests/dags/test_logging_in_dag.py b/tests/dags/test_logging_in_dag.py index 9cb68e1807a53..4420df012b6b4 100644 --- a/tests/dags/test_logging_in_dag.py +++ b/tests/dags/test_logging_in_dag.py @@ -27,10 +27,9 @@ def test_logging_fn(**kwargs): - """ - Tests DAG logging. + """Tests DAG logging. + :param kwargs: - :return: """ logger.info("Log from DAG Logger") kwargs["ti"].log.info("Log from TI Logger") diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py index 126bc6f48e4a7..99ee65fc5e47d 100644 --- a/tests/utils/test_cli_util.py +++ b/tests/utils/test_cli_util.py @@ -172,10 +172,7 @@ def test_setup_locations_none_pid_path(self): @contextmanager def fail_action_logger_callback(): - """ - Adding failing callback and revert it back when closed. - :return: - """ + """Adding failing callback and revert it back when closed.""" tmp = cli_action_loggers.__pre_exec_callbacks[:] def fail_callback(**_):