diff --git a/airflow-core/src/airflow/api_fastapi/app.py b/airflow-core/src/airflow/api_fastapi/app.py index 3e76c4f8f2df3..58cfb157083ce 100644 --- a/airflow-core/src/airflow/api_fastapi/app.py +++ b/airflow-core/src/airflow/api_fastapi/app.py @@ -159,8 +159,6 @@ def init_auth_manager(app: FastAPI | None = None) -> BaseAuthManager: def get_auth_manager() -> BaseAuthManager: """Return the auth manager, provided it's been initialized before.""" - global auth_manager - if auth_manager is None: raise RuntimeError( "Auth Manager has not been initialized yet. " diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index 18b376ccba520..1bfbbcf62b445 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -1844,7 +1844,7 @@ def load_test_config(self): """ # We need those globals before we run "get_all_expansion_variables" because this is where # the variables are expanded from in the configuration - global FERNET_KEY, AIRFLOW_HOME, JWT_SECRET_KEY + global FERNET_KEY, JWT_SECRET_KEY from cryptography.fernet import Fernet unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg" diff --git a/airflow-core/src/airflow/logging_config.py b/airflow-core/src/airflow/logging_config.py index 0875ac9be9092..fb29b155c9417 100644 --- a/airflow-core/src/airflow/logging_config.py +++ b/airflow-core/src/airflow/logging_config.py @@ -38,7 +38,6 @@ def __getattr__(name: str): if name == "REMOTE_TASK_LOG": - global REMOTE_TASK_LOG load_logging_config() return REMOTE_TASK_LOG diff --git a/airflow-core/src/airflow/plugins_manager.py b/airflow-core/src/airflow/plugins_manager.py index aa228342c1d80..5a70dbcd8f099 100644 --- a/airflow-core/src/airflow/plugins_manager.py +++ b/airflow-core/src/airflow/plugins_manager.py @@ -214,8 +214,6 @@ def is_valid_plugin(plugin_obj): :return: Whether or not the obj is a valid subclass of AirflowPlugin """ - global plugins - if ( inspect.isclass(plugin_obj) and issubclass(plugin_obj, AirflowPlugin) @@ -234,8 +232,6 @@ def register_plugin(plugin_instance): :param plugin_instance: subclass of AirflowPlugin """ - global plugins - if plugin_instance.name in loaded_plugins: return @@ -250,8 +246,6 @@ def load_entrypoint_plugins(): The entry_point group should be 'airflow.plugins'. """ - global import_errors - log.debug("Loading plugins from entrypoints") for entry_point, dist in entry_points_with_dist("airflow.plugins"): @@ -271,7 +265,6 @@ def load_entrypoint_plugins(): def load_plugins_from_plugin_directory(): """Load and register Airflow Plugins from plugins directory.""" - global import_errors log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER) files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore") plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)] @@ -373,7 +366,6 @@ def ensure_plugins_loaded(): def initialize_ui_plugins(): """Collect extension points for the UI.""" - global plugins global external_views global react_apps @@ -456,7 +448,6 @@ def _remove_list_item(lst, item): def initialize_flask_plugins(): """Collect flask extension points for WEB UI (legacy).""" - global plugins global flask_blueprints global flask_appbuilder_views global flask_appbuilder_menu_links @@ -496,7 +487,6 @@ def initialize_flask_plugins(): def initialize_fastapi_plugins(): """Collect extension points for the API.""" - global plugins global fastapi_apps global fastapi_root_middlewares @@ -593,7 +583,6 @@ def initialize_hook_lineage_readers_plugins(): def integrate_macros_plugins() -> None: """Integrates macro plugins.""" - global plugins global macros_modules from airflow.sdk.execution_time import macros @@ -626,8 +615,6 @@ def integrate_macros_plugins() -> None: def integrate_listener_plugins(listener_manager: ListenerManager) -> None: """Add listeners from plugins.""" - global plugins - ensure_plugins_loaded() if plugins: diff --git a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py index 720403e3c642d..13dacf5d8c50e 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py @@ -182,7 +182,6 @@ def docker_compose_up(tmp_path_factory): def docker_compose_down(): """Tear down Docker Compose environment.""" - global docker_client if docker_client: docker_client.compose.down(remove_orphans=True, volumes=True, quiet=True) diff --git a/dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py b/dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py index a7bc466e7aac6..e1d1a6ce85d12 100644 --- a/dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py +++ b/dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py @@ -831,7 +831,6 @@ def update_release_notes( return with_breaking_changes, maybe_with_new_features, False change_table_len = len(list_of_list_of_changes[0]) table_iter = 0 - global SHORT_HASH_TO_TYPE_DICT type_of_current_package_changes: list[TypeOfChange] = [] while table_iter < change_table_len: get_console().print() diff --git a/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py b/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py index 1e8e08eccefcf..363b101faebb4 100644 --- a/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py +++ b/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py @@ -461,7 +461,10 @@ def test_task_retry_on_api_failure(self, _, mock_executor, caplog): mock_executor.attempt_submit_jobs() mock_executor.sync_running_jobs() for i in range(2): - assert f"Airflow task {airflow_keys[i]} has failed a maximum of {mock_executor.MAX_SUBMIT_JOB_ATTEMPTS} times. Marking as failed" + assert ( + f"Airflow task {airflow_keys[i]} has failed a maximum of {mock_executor.MAX_SUBMIT_JOB_ATTEMPTS} times. Marking as failed" + in caplog.text + ) @mock.patch("airflow.providers.amazon.aws.executors.batch.batch_executor.exponential_backoff_retry") def test_sync_unhealthy_boto_connection(self, mock_exponentional_backoff_retry, mock_executor): diff --git a/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py index 2987796dcd362..da69f22cfa34a 100644 --- a/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py @@ -554,7 +554,10 @@ def test_inject_openlineage_simple_config_wrong_transport_to_spark( operator.hook.TERMINAL_STATES = [BatchState.SUCCESS] operator.execute(MagicMock()) - assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + assert ( + "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + in caplog.text + ) assert operator.spark_params["conf"] == {} diff --git a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py index 7acffb447d07c..c7010a0fb835f 100644 --- a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py +++ b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py @@ -467,7 +467,10 @@ def test_inject_openlineage_simple_config_wrong_transport_to_spark( ) operator.execute(MagicMock()) - assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + assert ( + "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + in caplog.text + ) assert operator.conf == { "parquet.compression": "SNAPPY", } diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py index ad68c5f410d28..9408ea04ac994 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py @@ -2528,6 +2528,7 @@ def test_async_get_logs_should_execute_successfully( @patch(KUB_OP_PATH.format("extract_xcom")) @patch(HOOK_CLASS) @patch(KUB_OP_PATH.format("pod_manager")) + @pytest.mark.xfail def test_async_write_logs_should_execute_successfully( self, mock_manager, mocked_hook, mock_extract_xcom, post_complete_action, get_logs ): @@ -2548,8 +2549,12 @@ def test_async_write_logs_should_execute_successfully( self.run_pod_async(k) if get_logs: - assert f"Container logs: {test_logs}" + # Note: the test below is broken and failing. Either the mock is wrong + # or the mocked container is not in a state that logging methods are called at-all. + # See https://github.com/apache/airflow/issues/57515 + assert f"Container logs: {test_logs}" # noqa: PLW0129 post_complete_action.assert_called_once() + mock_manager.return_value.read_pod_logs.assert_called() else: mock_manager.return_value.read_pod_logs.assert_not_called() diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index 66fae2524d618..13271854da129 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -141,14 +141,14 @@ async def test_run_loop_return_waiting_event( mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.WAITING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") @@ -160,14 +160,14 @@ async def test_run_loop_return_running_event( mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.RUNNING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") @@ -235,7 +235,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( generator = trigger.run() await generator.asend(None) - assert "Container logs:" + assert "Waiting until 120s to get the POD scheduled..." in caplog.text @pytest.mark.asyncio @pytest.mark.parametrize( diff --git a/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py b/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py index 9ce383c53c901..cba7cb58993b4 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py @@ -153,5 +153,8 @@ async def test_run_loop_is_still_running(self, mock_hook, trigger, caplog): await asyncio.sleep(0.5) assert not task.done() - assert f"Current job status is: {TransferState.RUNNING}" - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert ( + f"Current state is {TransferState.RUNNING}" in caplog.text + or "Current state is TransferState.RUNNING" in caplog.text + ) + assert f"Waiting for {POLL_INTERVAL} seconds" in caplog.text diff --git a/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py b/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py index 1312a1dfb2aca..bf8cfd05d6041 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py @@ -229,8 +229,8 @@ async def test_run_loop_is_still_running(self, mock_job_status, template_job_sta await asyncio.sleep(0.5) assert not task.done() - assert f"Current job status is: {JobState.JOB_STATE_RUNNING}" - assert f"Sleeping for {POLL_SLEEP} seconds." + assert "Current job status is: JOB_STATE_RUNNING" in caplog.text + assert f"Sleeping for {POLL_SLEEP} seconds." in caplog.text # cancel the task to suppress test warnings task.cancel() diff --git a/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py b/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py index 27ee12bb43629..084d1175f8d49 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py @@ -132,4 +132,4 @@ async def test_async_dataplex_job_run_loop_is_still_running( await asyncio.sleep(0.5) assert not task.done() - assert f"Current state is: {DataScanJob.State.RUNNING}, sleeping for {TEST_POLL_INTERVAL} seconds." + assert f"Current state is: RUNNING, sleeping for {TEST_POLL_INTERVAL} seconds." in caplog.text diff --git a/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py b/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py index 3704c5c320624..05e194a3f9735 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py @@ -200,14 +200,14 @@ async def test_run_loop_return_waiting_event_should_execute_successfully( mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.WAITING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_KUB_POD_PATH}._wait_for_pod_start") @@ -219,14 +219,14 @@ async def test_run_loop_return_running_event_should_execute_successfully( mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.RUNNING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_KUB_POD_PATH}._wait_for_pod_start") @@ -265,7 +265,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( generator = trigger.run() await generator.asend(None) - assert "Container logs:" + assert "Waiting until 120s to get the POD scheduled..." in caplog.text @pytest.mark.parametrize( "container_state, expected_state", @@ -447,8 +447,8 @@ async def test_run_loop_return_waiting_event_pending_status( await asyncio.sleep(0.5) assert not task.done() - assert "Operation is still running." - assert f"Sleeping for {POLL_INTERVAL}s..." + assert "Operation is still running." in caplog.text + assert f"Sleeping for {POLL_INTERVAL}s..." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}._get_hook") @@ -470,8 +470,8 @@ async def test_run_loop_return_waiting_event_running_status( await asyncio.sleep(0.5) assert not task.done() - assert "Operation is still running." - assert f"Sleeping for {POLL_INTERVAL}s..." + assert "Operation is still running." in caplog.text + assert f"Sleeping for {POLL_INTERVAL}s..." in caplog.text class TestGKEStartJobTrigger: diff --git a/providers/standard/tests/unit/standard/operators/test_python.py b/providers/standard/tests/unit/standard/operators/test_python.py index f025d99a8525b..e9af810fc71e2 100644 --- a/providers/standard/tests/unit/standard/operators/test_python.py +++ b/providers/standard/tests/unit/standard/operators/test_python.py @@ -955,7 +955,6 @@ def f(): def test_string_args(self): def f(): - global virtualenv_string_args print(virtualenv_string_args) if virtualenv_string_args[0] != virtualenv_string_args[2]: raise RuntimeError diff --git a/pyproject.toml b/pyproject.toml index 5fa1e3fbdc646..5f4c6d9bfbfe2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -589,6 +589,12 @@ extend-select = [ # Warning (PLW) re-implemented in ruff from Pylint "PLW0120", # else clause on loop without a break statement; remove the else and dedent its contents "PLW0127", # Self-assignment of variable + "PLW0128", # Redeclared variable {name} in assignment + "PLW0129", # Asserting on an empty string literal will never pass + "PLW0133", # Missing raise statement on exception + "PLW0245", # super call is missing parentheses + "PLW0406", # Module {name} imports itself + "PLW0602", # Using global for {name} but no assignment is done # Per rule enables "RUF006", # Checks for asyncio dangling task "RUF015", # Checks for unnecessary iterable allocation for first element @@ -621,7 +627,6 @@ extend-select = [ "RET506", # Unnecessary {branch} after raise statement "RET507", # Unnecessary {branch} after continue statement "RET508", # Unnecessary {branch} after break statement - "PLW0133", # Missing raise statement on exception ] ignore = [ "D100", # Unwanted; Docstring at the top of every file. diff --git a/scripts/ci/prek/check_providers_subpackages_all_have_init.py b/scripts/ci/prek/check_providers_subpackages_all_have_init.py index 3a935a963b65c..85bf55626bc73 100755 --- a/scripts/ci/prek/check_providers_subpackages_all_have_init.py +++ b/scripts/ci/prek/check_providers_subpackages_all_have_init.py @@ -105,7 +105,6 @@ def _determine_init_py_action(need_path_extension: bool, root_path: Path): def check_dir_init_test_folders(folders: list[Path]) -> None: - global should_fail folders = list(folders) for root_distribution_path in folders: # We need init folders for all folders and for the common ones we need path extension @@ -121,7 +120,6 @@ def check_dir_init_test_folders(folders: list[Path]) -> None: def check_dir_init_src_folders(folders: list[Path]) -> None: - global should_fail folders = list(folders) for root_distribution_path in folders: # We need init folders for all folders and for the common ones we need path extension