From ca3ce78fbaa4a04c061116acf2c86a311fdab30d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= <6774676+eumiro@users.noreply.github.com> Date: Thu, 28 Sep 2023 14:46:03 +0000 Subject: [PATCH] Refactor import from collections (#34406) --- .../api/common/experimental/get_lineage.py | 4 +- .../managers/fab/cli_commands/role_command.py | 6 +-- airflow/cli/cli_parser.py | 4 +- airflow/dag_processing/manager.py | 9 ++-- airflow/models/baseoperator.py | 1 - airflow/models/dag.py | 1 - airflow/models/mappedoperator.py | 1 - .../providers/amazon/aws/hooks/sagemaker.py | 5 +- airflow/ti_deps/deps/trigger_rule_dep.py | 6 +-- airflow/utils/entry_points.py | 4 +- airflow/utils/serve_logs.py | 4 +- airflow/www/views.py | 14 +++--- .../prepare_provider_packages.py | 4 +- tests/dag_processing/test_job_runner.py | 46 ++++++++----------- tests/jobs/test_scheduler_job.py | 6 +-- 15 files changed, 51 insertions(+), 64 deletions(-) diff --git a/airflow/api/common/experimental/get_lineage.py b/airflow/api/common/experimental/get_lineage.py index 969d33454555..49665eba3915 100644 --- a/airflow/api/common/experimental/get_lineage.py +++ b/airflow/api/common/experimental/get_lineage.py @@ -18,7 +18,7 @@ """Lineage APIs.""" from __future__ import annotations -import collections +from collections import defaultdict from typing import TYPE_CHECKING, Any from airflow.api.common.experimental import check_and_get_dag, check_and_get_dagrun @@ -43,7 +43,7 @@ def get_lineage( inlets = XCom.get_many(dag_ids=dag_id, run_id=dagrun.run_id, key=PIPELINE_INLETS, session=session) outlets = XCom.get_many(dag_ids=dag_id, run_id=dagrun.run_id, key=PIPELINE_OUTLETS, session=session) - lineage: dict[str, dict[str, Any]] = collections.defaultdict(dict) + lineage: dict[str, dict[str, Any]] = defaultdict(dict) for meta in inlets: lineage[meta.task_id]["inlets"] = meta.value for meta in outlets: diff --git a/airflow/auth/managers/fab/cli_commands/role_command.py b/airflow/auth/managers/fab/cli_commands/role_command.py index fc828ede771f..59ced49852e3 100644 --- a/airflow/auth/managers/fab/cli_commands/role_command.py +++ b/airflow/auth/managers/fab/cli_commands/role_command.py @@ -18,10 +18,10 @@ """Roles sub-commands.""" from __future__ import annotations -import collections import itertools import json import os +from collections import defaultdict from typing import TYPE_CHECKING from airflow.auth.managers.fab.cli_commands.utils import get_application_builder @@ -48,7 +48,7 @@ def roles_list(args): ) return - permission_map: dict[tuple[str, str], list[str]] = collections.defaultdict(list) + permission_map: dict[tuple[str, str], list[str]] = defaultdict(list) for role in roles: for permission in role.permissions: permission_map[(role.name, permission.resource.name)].append(permission.action.name) @@ -92,7 +92,7 @@ def __roles_add_or_remove_permissions(args): is_add: bool = args.subcommand.startswith("add") role_map = {} - perm_map: dict[tuple[str, str], set[str]] = collections.defaultdict(set) + perm_map: dict[tuple[str, str], set[str]] = defaultdict(set) asm = appbuilder.sm for name in args.role: role: Role | None = asm.find_role(name) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 35051964b6e7..b5991b44ae20 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -24,9 +24,9 @@ from __future__ import annotations import argparse -import collections import logging from argparse import Action +from collections import Counter from functools import lru_cache from typing import TYPE_CHECKING, Iterable @@ -82,7 +82,7 @@ # Check if sub-commands are defined twice, which could be an issue. if len(ALL_COMMANDS_DICT) < len(airflow_commands): - dup = {k for k, v in collections.Counter([c.name for c in airflow_commands]).items() if v > 1} + dup = {k for k, v in Counter([c.name for c in airflow_commands]).items() if v > 1} raise CliConflictError( f"The following CLI {len(dup)} command(s) are defined more than once: {sorted(dup)}\n" f"This can be due to the executor '{ExecutorLoader.get_default_executor_name()}' " diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index ab93a21026d5..3d93daf66b8e 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -18,7 +18,6 @@ """Processes DAGs.""" from __future__ import annotations -import collections import enum import importlib import inspect @@ -30,7 +29,7 @@ import sys import time import zipfile -from collections import defaultdict +from collections import defaultdict, deque from datetime import datetime, timedelta from importlib import import_module from pathlib import Path @@ -386,7 +385,7 @@ def __init__( super().__init__() # known files; this will be updated every `dag_dir_list_interval` and stuff added/removed accordingly self._file_paths: list[str] = [] - self._file_path_queue: collections.deque[str] = collections.deque() + self._file_path_queue: deque[str] = deque() self._max_runs = max_runs # signal_conn is None for dag_processor_standalone mode. self._direct_scheduler_conn = signal_conn @@ -743,7 +742,7 @@ def _add_callback_to_queue(self, request: CallbackRequest): # Remove file paths matching request.full_filepath from self._file_path_queue # Since we are already going to use that filepath to run callback, # there is no need to have same file path again in the queue - self._file_path_queue = collections.deque( + self._file_path_queue = deque( file_path for file_path in self._file_path_queue if file_path != request.full_filepath ) self._add_paths_to_queue([request.full_filepath], True) @@ -986,7 +985,7 @@ def set_file_paths(self, new_file_paths): self._file_paths = new_file_paths # clean up the queues; remove anything queued which no longer in the list, including callbacks - self._file_path_queue = collections.deque(x for x in self._file_path_queue if x in new_file_paths) + self._file_path_queue = deque(x for x in self._file_path_queue if x in new_file_paths) Stats.gauge("dag_processing.file_path_queue_size", len(self._file_path_queue)) callback_paths_to_del = [x for x in self._callback_to_execute if x not in new_file_paths] diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index a5713dbdf687..5993551f9356 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -19,7 +19,6 @@ from __future__ import annotations import abc -import collections import collections.abc import contextlib import copy diff --git a/airflow/models/dag.py b/airflow/models/dag.py index fdb69aea9628..6f018fe8da44 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import collections import collections.abc import copy import functools diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py index 621a21e53ebb..bba45968f01f 100644 --- a/airflow/models/mappedoperator.py +++ b/airflow/models/mappedoperator.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import collections import collections.abc import contextlib import copy diff --git a/airflow/providers/amazon/aws/hooks/sagemaker.py b/airflow/providers/amazon/aws/hooks/sagemaker.py index 2911ddd86f73..c2e671d4272a 100644 --- a/airflow/providers/amazon/aws/hooks/sagemaker.py +++ b/airflow/providers/amazon/aws/hooks/sagemaker.py @@ -17,14 +17,13 @@ # under the License. from __future__ import annotations -import collections import os import re import tarfile import tempfile import time import warnings -from collections import Counter +from collections import Counter, namedtuple from datetime import datetime from functools import partial from typing import Any, Callable, Generator, cast @@ -54,7 +53,7 @@ class LogState: # Position is a tuple that includes the last read timestamp and the number of items that were read # at that time. This is used to figure out which event to start with on the next read. -Position = collections.namedtuple("Position", ["timestamp", "skip"]) +Position = namedtuple("Position", ["timestamp", "skip"]) def argmin(arr, f: Callable) -> int | None: diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index 670dcd5bff12..08ddc93b5172 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -17,9 +17,9 @@ # under the License. from __future__ import annotations -import collections import collections.abc import functools +from collections import Counter from typing import TYPE_CHECKING, Iterator, KeysView, NamedTuple from sqlalchemy import and_, func, or_, select @@ -64,8 +64,8 @@ def calculate(cls, finished_upstreams: Iterator[TaskInstance]) -> _UpstreamTISta :param ti: the ti that we want to calculate deps for :param finished_tis: all the finished tasks of the dag_run """ - counter: dict[str, int] = collections.Counter() - setup_counter: dict[str, int] = collections.Counter() + counter: dict[str, int] = Counter() + setup_counter: dict[str, int] = Counter() for ti in finished_upstreams: curr_state = {ti.state: 1} counter.update(curr_state) diff --git a/airflow/utils/entry_points.py b/airflow/utils/entry_points.py index b3f145110fec..aff4c0a9f933 100644 --- a/airflow/utils/entry_points.py +++ b/airflow/utils/entry_points.py @@ -16,9 +16,9 @@ # under the License. from __future__ import annotations -import collections import functools import logging +from collections import defaultdict from typing import Iterator, Tuple try: @@ -33,7 +33,7 @@ @functools.lru_cache(maxsize=None) def _get_grouped_entry_points() -> dict[str, list[EPnD]]: - mapping: dict[str, list[EPnD]] = collections.defaultdict(list) + mapping: dict[str, list[EPnD]] = defaultdict(list) for dist in metadata.distributions(): try: for e in dist.entry_points: diff --git a/airflow/utils/serve_logs.py b/airflow/utils/serve_logs.py index 0e926ccd8920..6b120878b918 100644 --- a/airflow/utils/serve_logs.py +++ b/airflow/utils/serve_logs.py @@ -17,10 +17,10 @@ """Serve logs process.""" from __future__ import annotations -import collections import logging import os import socket +from collections import namedtuple import gunicorn.app.base from flask import Flask, abort, request, send_from_directory @@ -134,7 +134,7 @@ def serve_logs_view(filename): return flask_app -GunicornOption = collections.namedtuple("GunicornOption", ["key", "value"]) +GunicornOption = namedtuple("GunicornOption", ["key", "value"]) class StandaloneGunicornApplication(gunicorn.app.base.BaseApplication): diff --git a/airflow/www/views.py b/airflow/www/views.py index bd641e33f099..c38c4f900b2c 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -import collections +import collections.abc import copy import datetime import itertools @@ -477,8 +477,8 @@ def get_mapped_group_summaries(): .order_by(TaskInstance.task_id, TaskInstance.run_id) ) # Group tis by run_id, and then map_index. - mapped_tis: Mapping[str, Mapping[int, list[TaskInstance]]] = collections.defaultdict( - lambda: collections.defaultdict(list), + mapped_tis: Mapping[str, Mapping[int, list[TaskInstance]]] = defaultdict( + lambda: defaultdict(list) ) for ti in mapped_ti_query: mapped_tis[ti.run_id][ti.map_index].append(ti) @@ -499,7 +499,7 @@ def get_mapped_group_summary(run_id: str, mapped_instances: Mapping[int, list[Ta # TODO: This assumes TI map index has a one-to-one mapping to # its parent mapped task group, which will not be true when we # allow nested mapping in the future. - mapped_states: MutableMapping[str, int] = collections.defaultdict(int) + mapped_states: MutableMapping[str, int] = defaultdict(int) for mis in mapped_instances.values(): child_states = {mi.state for mi in mis} state = next(s for s in wwwutils.priority if s in child_states) @@ -1222,7 +1222,7 @@ def task_stats(self, session: Session = NEW_SESSION): ) ) data = get_task_stats_from_query(qry) - payload: dict[str, list[dict[str, Any]]] = collections.defaultdict(list) + payload: dict[str, list[dict[str, Any]]] = defaultdict(list) for dag_id, state in itertools.product(filter_dag_ids, State.task_states): payload[dag_id].append({"state": state, "count": data.get(dag_id, {}).get(state, 0)}) return flask.json.jsonify(payload) @@ -3260,8 +3260,8 @@ def landing_times(self, dag_id: str, session: Session = NEW_SESSION): chart_attr=self.line_chart_attr, ) - y_points: dict[str, list[float]] = collections.defaultdict(list) - x_points: dict[str, list[tuple[int]]] = collections.defaultdict(list) + y_points: dict[str, list[float]] = defaultdict(list) + x_points: dict[str, list[tuple[int]]] = defaultdict(list) for task in dag.tasks: task_id = task.task_id for ti in tis: diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py index 1aab7bc536e4..a62581101b56 100755 --- a/dev/provider_packages/prepare_provider_packages.py +++ b/dev/provider_packages/prepare_provider_packages.py @@ -20,7 +20,6 @@ """Setup.py for the Provider packages of Airflow project.""" from __future__ import annotations -import collections import difflib import glob import json @@ -33,6 +32,7 @@ import sys import tempfile import textwrap +from collections import namedtuple from contextlib import contextmanager from copy import deepcopy from datetime import datetime, timedelta @@ -605,7 +605,7 @@ def convert_cross_package_dependencies_to_table( """ Keeps information about historical releases. """ -ReleaseInfo = collections.namedtuple( +ReleaseInfo = namedtuple( "ReleaseInfo", "release_version release_version_no_leading_zeros last_commit_hash content file_name" ) diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 51187d71539a..d5135afd5c7b 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import collections import contextlib import itertools import logging @@ -30,6 +29,7 @@ import textwrap import threading import time +from collections import deque from datetime import datetime, timedelta from logging.config import dictConfig from unittest import mock @@ -231,7 +231,7 @@ def test_start_new_processes_with_same_filepath(self, _): file_1 = "file_1.py" file_2 = "file_2.py" file_3 = "file_3.py" - manager.processor._file_path_queue = collections.deque([file_1, file_2, file_3]) + manager.processor._file_path_queue = deque([file_1, file_2, file_3]) # Mock that only one processor exists. This processor runs with 'file_1' manager.processor._processors[file_1] = MagicMock() @@ -246,7 +246,7 @@ def test_start_new_processes_with_same_filepath(self, _): assert file_1 in manager.processor._processors.keys() assert file_2 in manager.processor._processors.keys() - assert collections.deque([file_3]) == manager.processor._file_path_queue + assert deque([file_3]) == manager.processor._file_path_queue def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): manager = DagProcessorJobRunner( @@ -322,9 +322,9 @@ def test_file_paths_in_queue_sorted_alphabetically( ) manager.processor.set_file_paths(dag_files) - assert manager.processor._file_path_queue == collections.deque() + assert manager.processor._file_path_queue == deque() manager.processor.prepare_file_path_queue() - assert manager.processor._file_path_queue == collections.deque( + assert manager.processor._file_path_queue == deque( ["file_1.py", "file_2.py", "file_3.py", "file_4.py"] ) @@ -354,10 +354,10 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host( ) manager.processor.set_file_paths(dag_files) - assert manager.processor._file_path_queue == collections.deque() + assert manager.processor._file_path_queue == deque() manager.processor.prepare_file_path_queue() - expected_order = collections.deque(dag_files) + expected_order = deque(dag_files) random.Random(get_hostname()).shuffle(expected_order) assert manager.processor._file_path_queue == expected_order @@ -419,9 +419,9 @@ def test_file_paths_in_queue_sorted_by_modified_time( ) manager.processor.set_file_paths(dag_files) - assert manager.processor._file_path_queue == collections.deque() + assert manager.processor._file_path_queue == deque() manager.processor.prepare_file_path_queue() - assert manager.processor._file_path_queue == collections.deque( + assert manager.processor._file_path_queue == deque( ["file_4.py", "file_1.py", "file_3.py", "file_2.py"] ) @@ -460,7 +460,7 @@ def test_file_paths_in_queue_excludes_missing_file( manager.processor.set_file_paths(dag_files) manager.processor.prepare_file_path_queue() - assert manager.processor._file_path_queue == collections.deque(["file_2.py", "file_3.py"]) + assert manager.processor._file_path_queue == deque(["file_2.py", "file_3.py"]) @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("zipfile.is_zipfile", return_value=True) @@ -497,13 +497,11 @@ def test_add_new_file_to_parsing_queue( manager.processor.set_file_paths(dag_files) manager.processor.prepare_file_path_queue() - assert manager.processor._file_path_queue == collections.deque( - ["file_3.py", "file_2.py", "file_1.py"] - ) + assert manager.processor._file_path_queue == deque(["file_3.py", "file_2.py", "file_1.py"]) manager.processor.set_file_paths([*dag_files, "file_4.py"]) manager.processor.add_new_file_path_to_queue() - assert manager.processor._file_path_queue == collections.deque( + assert manager.processor._file_path_queue == deque( ["file_4.py", "file_3.py", "file_2.py", "file_1.py"] ) @@ -552,10 +550,10 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( } with time_machine.travel(freezed_base_time): manager.processor.set_file_paths(dag_files) - assert manager.processor._file_path_queue == collections.deque() + assert manager.processor._file_path_queue == deque() # File Path Queue will be empty as the "modified time" < "last finish time" manager.processor.prepare_file_path_queue() - assert manager.processor._file_path_queue == collections.deque() + assert manager.processor._file_path_queue == deque() # Simulate the DAG modification by using modified_time which is greater # than the last_parse_time but still less than now - min_file_process_interval @@ -563,12 +561,12 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( file_1_new_mtime_ts = file_1_new_mtime.timestamp() with time_machine.travel(freezed_base_time): manager.processor.set_file_paths(dag_files) - assert manager.processor._file_path_queue == collections.deque() + assert manager.processor._file_path_queue == deque() # File Path Queue will be empty as the "modified time" < "last finish time" mock_getmtime.side_effect = [file_1_new_mtime_ts] manager.processor.prepare_file_path_queue() # Check that file is added to the queue even though file was just recently passed - assert manager.processor._file_path_queue == collections.deque(["file_1.py"]) + assert manager.processor._file_path_queue == deque(["file_1.py"]) assert last_finish_time < file_1_new_mtime assert ( manager.processor._file_process_interval @@ -1278,9 +1276,7 @@ def test_callback_queue(self, tmp_path): manager.processor._add_callback_to_queue(dag2_req1) # then - requests should be in manager's queue, with dag2 ahead of dag1 (because it was added last) - assert manager.processor._file_path_queue == collections.deque( - [dag2_req1.full_filepath, dag1_req1.full_filepath] - ) + assert manager.processor._file_path_queue == deque([dag2_req1.full_filepath, dag1_req1.full_filepath]) assert set(manager.processor._callback_to_execute.keys()) == { dag1_req1.full_filepath, dag2_req1.full_filepath, @@ -1294,9 +1290,7 @@ def test_callback_queue(self, tmp_path): # then - since sla2 == sla1, should not have brought dag1 to the fore, and an SLA on dag3 doesn't # update the queue, although the callback is registered - assert manager.processor._file_path_queue == collections.deque( - [dag2_req1.full_filepath, dag1_req1.full_filepath] - ) + assert manager.processor._file_path_queue == deque([dag2_req1.full_filepath, dag1_req1.full_filepath]) assert manager.processor._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1] assert manager.processor._callback_to_execute[dag3_sla1.full_filepath] == [dag3_sla1] @@ -1304,9 +1298,7 @@ def test_callback_queue(self, tmp_path): manager.processor._add_callback_to_queue(dag1_req2) # then - non-sla callback should have brought dag1 to the fore - assert manager.processor._file_path_queue == collections.deque( - [dag1_req1.full_filepath, dag2_req1.full_filepath] - ) + assert manager.processor._file_path_queue == deque([dag1_req1.full_filepath, dag2_req1.full_filepath]) assert manager.processor._callback_to_execute[dag1_req1.full_filepath] == [ dag1_req1, dag1_sla1, diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 58b4fcfb216f..6f3085bbe2b8 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -17,11 +17,11 @@ # under the License. from __future__ import annotations -import collections import contextlib import datetime import logging import os +from collections import deque from datetime import timedelta from typing import Generator from unittest import mock @@ -4810,8 +4810,8 @@ def spy(*args, **kwargs): return spy - num_queued_tis: collections.deque[int] = collections.deque([], 3) - num_finished_events: collections.deque[int] = collections.deque([], 3) + num_queued_tis: deque[int] = deque([], 3) + num_finished_events: deque[int] = deque([], 3) do_scheduling_spy = mock.patch.object( job_runner,