Skip to content
Merged
5 changes: 1 addition & 4 deletions airflow/api/auth/backend/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,7 @@ def init_app(app):


def _unauthorized():
"""
Indicate that authorization is required.
:return:
"""
"""Indicate that authorization is required."""
return Response("Unauthorized", 401, {"WWW-Authenticate": "Negotiate"})


Expand Down
1 change: 1 addition & 0 deletions airflow/api/client/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def delete_pool(self, name):
def get_lineage(self, dag_id: str, execution_date: str):
"""
Return the lineage information for the dag on this execution date.

:param dag_id:
:param execution_date:
:return:
Expand Down
4 changes: 3 additions & 1 deletion airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ def _save_dot_to_file(dot: Dot, filename: str) -> None:
def dag_state(args, session: Session = NEW_SESSION) -> None:
"""
Returns the state (and conf if exists) of a DagRun at the command line.

>>> airflow dags state tutorial 2015-01-01T00:00:00.000000
running
>>> airflow dags state a_dag_with_conf_passed 2015-01-01T00:00:00.000000
Expand All @@ -290,6 +291,7 @@ def dag_state(args, session: Session = NEW_SESSION) -> None:
def dag_next_execution(args) -> None:
"""
Returns the next execution datetime of a DAG at the command line.

>>> airflow dags next-execution tutorial
2018-08-31 10:38:00
"""
Expand Down Expand Up @@ -350,7 +352,7 @@ def dag_list_dags(args) -> None:
@suppress_logs_and_warning
@provide_session
def dag_details(args, session=NEW_SESSION):
"""Get DAG details given a DAG id"""
"""Get DAG details given a DAG id."""
dag = DagModel.get_dagmodel(args.dag_id, session=session)
if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/internal_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def monitor_gunicorn(gunicorn_master_pid: int):


def create_app(config=None, testing=False):
"""Create a new instance of Airflow Internal API app"""
"""Create a new instance of Airflow Internal API app."""
flask_app = Flask(__name__)

flask_app.config["APP_NAME"] = "Airflow Internal API"
Expand Down Expand Up @@ -255,7 +255,7 @@ def create_app(config=None, testing=False):


def cached_app(config=None, testing=False):
"""Return cached instance of Airflow Internal API app"""
"""Return cached instance of Airflow Internal API app."""
global app
if not app:
app = create_app(config=config, testing=testing)
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ def task_failed_deps(args) -> None:
def task_state(args) -> None:
"""
Returns the state of a TaskInstance at the command line.

>>> airflow tasks state tutorial sleep 2015-01-01
success
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/triggerer_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

@contextmanager
def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]:
"""Starts serve_logs sub-process"""
"""Starts serve_logs sub-process."""
sub_proc = None
if skip_serve_logs is False:
port = conf.getint("logging", "trigger_log_server_port", fallback=8794)
Expand Down
10 changes: 5 additions & 5 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ def deactivate_stale_dags(
):
"""
Detects DAGs which are no longer present in files.
Deactivate them and remove them in the serialized_dag table
Deactivate them and remove them in the serialized_dag table.
"""
to_deactivate = set()
query = session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time).filter(
Expand Down Expand Up @@ -891,7 +891,7 @@ def _log_file_processing_stats(self, known_file_paths):
def get_pid(self, file_path) -> int | None:
"""
Retrieve the PID of the process processing the given file or None if the file is not being processed.
:param file_path: the path to the file that's being processed
:param file_path: the path to the file that's being processed.
"""
if file_path in self._processors:
return self._processors[file_path].pid
Expand Down Expand Up @@ -950,7 +950,7 @@ def get_start_time(self, file_path) -> datetime | None:
Retrieve the last start time for processing a specific path.
:param file_path: the path to the file that's being processed
:return: the start time of the process that's processing the
specified file or None if the file is not currently being processed
specified file or None if the file is not currently being processed.
"""
if file_path in self._processors:
return self._processors[file_path].start_time
Expand All @@ -959,7 +959,7 @@ def get_start_time(self, file_path) -> datetime | None:
def get_run_count(self, file_path) -> int:
"""
The number of times the given file has been parsed.
:param file_path: the path to the file that's being processed
:param file_path: the path to the file that's being processed.
"""
stat = self._file_stats.get(file_path)
return stat.run_count if stat else 0
Expand Down Expand Up @@ -1233,7 +1233,7 @@ def _kill_timed_out_processors(self):
self._processors.pop(proc)

def _add_paths_to_queue(self, file_paths_to_enqueue: list[str], add_at_front: bool):
"""Adds stuff to the back or front of the file queue, unless it's already present"""
"""Adds stuff to the back or front of the file queue, unless it's already present."""
new_file_paths = list(p for p in file_paths_to_enqueue if p not in self._file_path_queue)
if add_at_front:
self._file_path_queue.extendleft(new_file_paths)
Expand Down
2 changes: 1 addition & 1 deletion airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def __init__(self, *args, **kwargs):


class DagInvalidTriggerRule(AirflowException):
"""Raise when a dag has 'fail_stop' enabled yet has a non-default trigger rule"""
"""Raise when a dag has 'fail_stop' enabled yet has a non-default trigger rule."""

@classmethod
def check(cls, dag: DAG | None, trigger_rule: str):
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class RunningRetryAttemptType:

@property
def elapsed(self):
"""Seconds since first attempt"""
"""Seconds since first attempt."""
return (pendulum.now("UTC") - self.first_attempt_time).total_seconds()

def can_try_again(self):
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def queue_task_instance(
)

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
"""Fetch task log from Kubernetes executor"""
"""Fetch task log from Kubernetes executor."""
if ti.queue == self.kubernetes_executor.kubernetes_queue:
return self.kubernetes_executor.get_task_log(ti=ti, try_number=try_number)
return [], []
Expand Down
4 changes: 2 additions & 2 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def delete_pod(self, pod_name: str, namespace: str) -> None:
raise

def patch_pod_executor_done(self, *, pod_name: str, namespace: str):
"""Add a "done" annotation to ensure we don't continually adopt pods"""
"""Add a "done" annotation to ensure we don't continually adopt pods."""
self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_name, namespace)
try:
self.kube_client.patch_namespaced_pod(
Expand Down Expand Up @@ -1004,7 +1004,7 @@ def _flush_result_queue(self) -> None:
break

def end(self) -> None:
"""Called when the executor shuts down"""
"""Called when the executor shuts down."""
if TYPE_CHECKING:
assert self.task_queue
assert self.result_queue
Expand Down
5 changes: 1 addition & 4 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,7 @@ def sync(self) -> None:
self.impl.sync()

def end(self) -> None:
"""
Ends the executor.
:return:
"""
"""Ends the executor."""
if TYPE_CHECKING:
assert self.impl
assert self.manager
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def queue_task_instance(
)

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
"""Fetch task log from kubernetes executor"""
"""Fetch task log from kubernetes executor."""
if ti.queue == self.kubernetes_executor.kubernetes_queue:
return self.kubernetes_executor.get_task_log(ti=ti, try_number=try_number)
return [], []
Expand Down
6 changes: 4 additions & 2 deletions airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ def run_job(
job: Job | JobPydantic, execute_callable: Callable[[], int | None], session: Session = NEW_SESSION
) -> int | None:
"""
Runs the job. The Job is always an ORM object and setting the state is happening within the
same DB session and the session is kept open throughout the whole execution
Runs the job.

The Job is always an ORM object and setting the state is happening within the
same DB session and the session is kept open throughout the whole execution.

:meta private:

Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def signal_handler(signum, frame):
self.handle_task_exit(128 + signum)

def segfault_signal_handler(signum, frame):
"""Setting sigmentation violation signal handler"""
"""Setting sigmentation violation signal handler."""
self.log.critical(SIGSEGV_MESSAGE)
self.task_runner.terminate()
self.handle_task_exit(128 + signum)
Expand Down
8 changes: 4 additions & 4 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
@dataclass
class ConcurrencyMap:
"""
Dataclass to represent concurrency maps
Dataclass to represent concurrency maps.

It contains a map from (dag_id, task_id) to # of task instances, a map from (dag_id, task_id)
to # of task instances in the given state list and a map from (dag_id, run_id, task_id)
Expand Down Expand Up @@ -1122,7 +1122,7 @@ def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: Sessio
def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -> None:
"""
Unconditionally create a DAG run for the given DAG, and update the dag_model's fields to control
if/when the next DAGRun should be created
if/when the next DAGRun should be created.
"""
# Bulk Fetch DagRuns with dag_id and execution_date same
# as DagModel.dag_id and DagModel.next_dagrun
Expand Down Expand Up @@ -1343,7 +1343,7 @@ def _schedule_all_dag_runs(
dag_runs: Iterable[DagRun],
session: Session,
) -> list[tuple[DagRun, DagCallbackRequest | None]]:
"""Makes scheduling decisions for all `dag_runs`"""
"""Makes scheduling decisions for all `dag_runs`."""
callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
guard.commit()
return callback_tuples
Expand Down Expand Up @@ -1739,7 +1739,7 @@ def _set_orphaned(self, dataset: DatasetModel) -> int:
def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> None:
"""
Detects datasets that are no longer referenced in any DAG schedule parameters or task outlets and
sets the dataset is_orphaned flag to True
sets the dataset is_orphaned flag to True.
"""
orphaned_dataset_query = (
session.query(DatasetModel)
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def is_needed(cls, session) -> bool:
def on_kill(self):
"""
Called when there is an external kill command (via the heartbeat
mechanism, for example)
mechanism, for example).
"""
self.trigger_runner.stop = True

Expand Down Expand Up @@ -677,7 +677,7 @@ def update_triggers(self, requested_trigger_ids: set[int]):

def set_trigger_logging_metadata(self, ti: TaskInstance, trigger_id, trigger):
"""
Set up logging for triggers
Set up logging for triggers.

We want to ensure that each trigger logs to its own file and that the log messages are not
propagated to parent loggers.
Expand Down
4 changes: 2 additions & 2 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def build_selector_for_k8s_executor_pod(
airflow_worker=None,
):
"""
Generate selector for kubernetes executor pod
Generate selector for kubernetes executor pod.

:meta private:
"""
Expand Down Expand Up @@ -481,7 +481,7 @@ def build_labels_for_k8s_executor_pod(
run_id=None,
):
"""
Generate labels for kubernetes executor pod
Generate labels for kubernetes executor pod.

:meta private:
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/metrics/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def __subclasshook__(cls, subclass: Callable[[str], str]) -> bool:

@abc.abstractmethod
def test(self, name: str) -> bool:
"""Test if name is allowed"""
"""Test if name is allowed."""
raise NotImplementedError


Expand Down
2 changes: 1 addition & 1 deletion airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Airflow models"""
"""Airflow models."""
from __future__ import annotations

# Do not add new models to this -- this is for compat only
Expand Down
13 changes: 8 additions & 5 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def inherits_from_empty_operator(self) -> bool:

@property
def dag_id(self) -> str:
"""Returns dag id if it has one or an adhoc + owner"""
"""Returns dag id if it has one or an adhoc + owner."""
dag = self.get_dag()
if dag:
return dag.dag_id
Expand Down Expand Up @@ -243,7 +243,7 @@ def iter_mapped_dependants(self) -> Iterator[MappedOperator | MappedTaskGroup]:
def iter_mapped_task_groups(self) -> Iterator[MappedTaskGroup]:
"""Return mapped task groups this task belongs to.

Groups are returned from the closest to the outmost.
Groups are returned from the innermost to the outmost.

:meta private:
"""
Expand All @@ -254,7 +254,10 @@ def iter_mapped_task_groups(self) -> Iterator[MappedTaskGroup]:
parent = parent.task_group

def get_closest_mapped_task_group(self) -> MappedTaskGroup | None:
""":meta private:"""
"""Get the mapped task group "closest" to this task in the DAG.

:meta private:
"""
return next(self.iter_mapped_task_groups(), None)

def unmap(self, resolve: None | dict[str, Any] | tuple[Context, Session]) -> BaseOperator:
Expand Down Expand Up @@ -296,7 +299,7 @@ def priority_weight_total(self) -> int:

@cached_property
def operator_extra_link_dict(self) -> dict[str, Any]:
"""Returns dictionary of all extra links for the operator"""
"""Returns dictionary of all extra links for the operator."""
op_extra_links_from_plugin: dict[str, Any] = {}
from airflow import plugins_manager

Expand All @@ -315,7 +318,7 @@ def operator_extra_link_dict(self) -> dict[str, Any]:

@cached_property
def global_operator_extra_link_dict(self) -> dict[str, Any]:
"""Returns dictionary of all global extra links"""
"""Returns dictionary of all global extra links."""
from airflow import plugins_manager

plugins_manager.initialize_extra_operators_links_plugins()
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _get_schema():


def get_id_collation_args():
"""Get SQLAlchemy args to use for COLLATION"""
"""Get SQLAlchemy args to use for COLLATION."""
collation = conf.get("database", "sql_engine_collation_for_ids", fallback=None)
if collation:
return {"collation": collation}
Expand Down
Loading