Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions airflow-core/src/airflow/api_fastapi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ def init_auth_manager(app: FastAPI | None = None) -> BaseAuthManager:

def get_auth_manager() -> BaseAuthManager:
"""Return the auth manager, provided it's been initialized before."""
global auth_manager

if auth_manager is None:
raise RuntimeError(
"Auth Manager has not been initialized yet. "
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1844,7 +1844,7 @@ def load_test_config(self):
"""
# We need those globals before we run "get_all_expansion_variables" because this is where
# the variables are expanded from in the configuration
global FERNET_KEY, AIRFLOW_HOME, JWT_SECRET_KEY
global FERNET_KEY, JWT_SECRET_KEY
from cryptography.fernet import Fernet

unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg"
Expand Down
1 change: 0 additions & 1 deletion airflow-core/src/airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

def __getattr__(name: str):
if name == "REMOTE_TASK_LOG":
global REMOTE_TASK_LOG
load_logging_config()
return REMOTE_TASK_LOG

Expand Down
13 changes: 0 additions & 13 deletions airflow-core/src/airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,6 @@ def is_valid_plugin(plugin_obj):
:return: Whether or not the obj is a valid subclass of
AirflowPlugin
"""
global plugins

if (
inspect.isclass(plugin_obj)
and issubclass(plugin_obj, AirflowPlugin)
Expand All @@ -234,8 +232,6 @@ def register_plugin(plugin_instance):

:param plugin_instance: subclass of AirflowPlugin
"""
global plugins

if plugin_instance.name in loaded_plugins:
return

Expand All @@ -250,8 +246,6 @@ def load_entrypoint_plugins():

The entry_point group should be 'airflow.plugins'.
"""
global import_errors

log.debug("Loading plugins from entrypoints")

for entry_point, dist in entry_points_with_dist("airflow.plugins"):
Expand All @@ -271,7 +265,6 @@ def load_entrypoint_plugins():

def load_plugins_from_plugin_directory():
"""Load and register Airflow Plugins from plugins directory."""
global import_errors
log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore")
plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)]
Expand Down Expand Up @@ -373,7 +366,6 @@ def ensure_plugins_loaded():

def initialize_ui_plugins():
"""Collect extension points for the UI."""
global plugins
global external_views
global react_apps

Expand Down Expand Up @@ -456,7 +448,6 @@ def _remove_list_item(lst, item):

def initialize_flask_plugins():
"""Collect flask extension points for WEB UI (legacy)."""
global plugins
global flask_blueprints
global flask_appbuilder_views
global flask_appbuilder_menu_links
Expand Down Expand Up @@ -496,7 +487,6 @@ def initialize_flask_plugins():

def initialize_fastapi_plugins():
"""Collect extension points for the API."""
global plugins
global fastapi_apps
global fastapi_root_middlewares

Expand Down Expand Up @@ -593,7 +583,6 @@ def initialize_hook_lineage_readers_plugins():

def integrate_macros_plugins() -> None:
"""Integrates macro plugins."""
global plugins
global macros_modules

from airflow.sdk.execution_time import macros
Expand Down Expand Up @@ -626,8 +615,6 @@ def integrate_macros_plugins() -> None:

def integrate_listener_plugins(listener_manager: ListenerManager) -> None:
"""Add listeners from plugins."""
global plugins

ensure_plugins_loaded()

if plugins:
Expand Down
1 change: 0 additions & 1 deletion airflow-ctl-tests/tests/airflowctl_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ def docker_compose_up(tmp_path_factory):

def docker_compose_down():
"""Tear down Docker Compose environment."""
global docker_client
if docker_client:
docker_client.compose.down(remove_orphans=True, volumes=True, quiet=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,6 @@ def update_release_notes(
return with_breaking_changes, maybe_with_new_features, False
change_table_len = len(list_of_list_of_changes[0])
table_iter = 0
global SHORT_HASH_TO_TYPE_DICT
type_of_current_package_changes: list[TypeOfChange] = []
while table_iter < change_table_len:
get_console().print()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,10 @@ def test_task_retry_on_api_failure(self, _, mock_executor, caplog):
mock_executor.attempt_submit_jobs()
mock_executor.sync_running_jobs()
for i in range(2):
assert f"Airflow task {airflow_keys[i]} has failed a maximum of {mock_executor.MAX_SUBMIT_JOB_ATTEMPTS} times. Marking as failed"
assert (
f"Airflow task {airflow_keys[i]} has failed a maximum of {mock_executor.MAX_SUBMIT_JOB_ATTEMPTS} times. Marking as failed"
in caplog.text
)

@mock.patch("airflow.providers.amazon.aws.executors.batch.batch_executor.exponential_backoff_retry")
def test_sync_unhealthy_boto_connection(self, mock_exponentional_backoff_retry, mock_executor):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,10 @@ def test_inject_openlineage_simple_config_wrong_transport_to_spark(
operator.hook.TERMINAL_STATES = [BatchState.SUCCESS]
operator.execute(MagicMock())

assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
assert (
"OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
in caplog.text
)
assert operator.spark_params["conf"] == {}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,10 @@ def test_inject_openlineage_simple_config_wrong_transport_to_spark(
)
operator.execute(MagicMock())

assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
assert (
"OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
in caplog.text
)
assert operator.conf == {
"parquet.compression": "SNAPPY",
}
Original file line number Diff line number Diff line change
Expand Up @@ -2528,6 +2528,7 @@ def test_async_get_logs_should_execute_successfully(
@patch(KUB_OP_PATH.format("extract_xcom"))
@patch(HOOK_CLASS)
@patch(KUB_OP_PATH.format("pod_manager"))
@pytest.mark.xfail
def test_async_write_logs_should_execute_successfully(
self, mock_manager, mocked_hook, mock_extract_xcom, post_complete_action, get_logs
):
Expand All @@ -2548,8 +2549,12 @@ def test_async_write_logs_should_execute_successfully(
self.run_pod_async(k)

if get_logs:
assert f"Container logs: {test_logs}"
# Note: the test below is broken and failing. Either the mock is wrong
# or the mocked container is not in a state that logging methods are called at-all.
# See https://github.com/apache/airflow/issues/57515
assert f"Container logs: {test_logs}" # noqa: PLW0129
post_complete_action.assert_called_once()
mock_manager.return_value.read_pod_logs.assert_called()
else:
mock_manager.return_value.read_pod_logs.assert_not_called()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ async def test_run_loop_return_waiting_event(
mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock())
mock_method.return_value = ContainerState.WAITING

caplog.set_level(logging.INFO)
caplog.set_level(logging.DEBUG)

task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

assert not task.done()
assert "Container is not completed and still working."
assert f"Sleeping for {POLL_INTERVAL} seconds."
assert "Container is not completed and still working." in caplog.text
assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
Expand All @@ -160,14 +160,14 @@ async def test_run_loop_return_running_event(
mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock())
mock_method.return_value = ContainerState.RUNNING

caplog.set_level(logging.INFO)
caplog.set_level(logging.DEBUG)

task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

assert not task.done()
assert "Container is not completed and still working."
assert f"Sleeping for {POLL_INTERVAL} seconds."
assert "Container is not completed and still working." in caplog.text
assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
Expand Down Expand Up @@ -235,7 +235,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully(

generator = trigger.run()
await generator.asend(None)
assert "Container logs:"
assert "Waiting until 120s to get the POD scheduled..." in caplog.text

@pytest.mark.asyncio
@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,8 @@ async def test_run_loop_is_still_running(self, mock_hook, trigger, caplog):
await asyncio.sleep(0.5)

assert not task.done()
assert f"Current job status is: {TransferState.RUNNING}"
assert f"Sleeping for {POLL_INTERVAL} seconds."
assert (
f"Current state is {TransferState.RUNNING}" in caplog.text
or "Current state is TransferState.RUNNING" in caplog.text
)
assert f"Waiting for {POLL_INTERVAL} seconds" in caplog.text
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ async def test_run_loop_is_still_running(self, mock_job_status, template_job_sta
await asyncio.sleep(0.5)

assert not task.done()
assert f"Current job status is: {JobState.JOB_STATE_RUNNING}"
assert f"Sleeping for {POLL_SLEEP} seconds."
assert "Current job status is: JOB_STATE_RUNNING" in caplog.text
assert f"Sleeping for {POLL_SLEEP} seconds." in caplog.text
# cancel the task to suppress test warnings
task.cancel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,4 @@ async def test_async_dataplex_job_run_loop_is_still_running(
await asyncio.sleep(0.5)

assert not task.done()
assert f"Current state is: {DataScanJob.State.RUNNING}, sleeping for {TEST_POLL_INTERVAL} seconds."
assert f"Current state is: RUNNING, sleeping for {TEST_POLL_INTERVAL} seconds." in caplog.text
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,14 @@ async def test_run_loop_return_waiting_event_should_execute_successfully(
mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock())
mock_method.return_value = ContainerState.WAITING

caplog.set_level(logging.INFO)
caplog.set_level(logging.DEBUG)

task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

assert not task.done()
assert "Container is not completed and still working."
assert f"Sleeping for {POLL_INTERVAL} seconds."
assert "Container is not completed and still working." in caplog.text
assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_KUB_POD_PATH}._wait_for_pod_start")
Expand All @@ -219,14 +219,14 @@ async def test_run_loop_return_running_event_should_execute_successfully(
mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock())
mock_method.return_value = ContainerState.RUNNING

caplog.set_level(logging.INFO)
caplog.set_level(logging.DEBUG)

task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)

assert not task.done()
assert "Container is not completed and still working."
assert f"Sleeping for {POLL_INTERVAL} seconds."
assert "Container is not completed and still working." in caplog.text
assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_KUB_POD_PATH}._wait_for_pod_start")
Expand Down Expand Up @@ -265,7 +265,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully(

generator = trigger.run()
await generator.asend(None)
assert "Container logs:"
assert "Waiting until 120s to get the POD scheduled..." in caplog.text

@pytest.mark.parametrize(
"container_state, expected_state",
Expand Down Expand Up @@ -447,8 +447,8 @@ async def test_run_loop_return_waiting_event_pending_status(
await asyncio.sleep(0.5)

assert not task.done()
assert "Operation is still running."
assert f"Sleeping for {POLL_INTERVAL}s..."
assert "Operation is still running." in caplog.text
assert f"Sleeping for {POLL_INTERVAL}s..." in caplog.text

@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}._get_hook")
Expand All @@ -470,8 +470,8 @@ async def test_run_loop_return_waiting_event_running_status(
await asyncio.sleep(0.5)

assert not task.done()
assert "Operation is still running."
assert f"Sleeping for {POLL_INTERVAL}s..."
assert "Operation is still running." in caplog.text
assert f"Sleeping for {POLL_INTERVAL}s..." in caplog.text


class TestGKEStartJobTrigger:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,6 @@ def f():

def test_string_args(self):
def f():
global virtualenv_string_args
print(virtualenv_string_args)
if virtualenv_string_args[0] != virtualenv_string_args[2]:
raise RuntimeError
Expand Down
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,12 @@ extend-select = [
# Warning (PLW) re-implemented in ruff from Pylint
"PLW0120", # else clause on loop without a break statement; remove the else and dedent its contents
"PLW0127", # Self-assignment of variable
"PLW0128", # Redeclared variable {name} in assignment
"PLW0129", # Asserting on an empty string literal will never pass
"PLW0133", # Missing raise statement on exception
"PLW0245", # super call is missing parentheses
"PLW0406", # Module {name} imports itself
"PLW0602", # Using global for {name} but no assignment is done
# Per rule enables
"RUF006", # Checks for asyncio dangling task
"RUF015", # Checks for unnecessary iterable allocation for first element
Expand Down Expand Up @@ -621,7 +627,6 @@ extend-select = [
"RET506", # Unnecessary {branch} after raise statement
"RET507", # Unnecessary {branch} after continue statement
"RET508", # Unnecessary {branch} after break statement
"PLW0133", # Missing raise statement on exception
]
ignore = [
"D100", # Unwanted; Docstring at the top of every file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def _determine_init_py_action(need_path_extension: bool, root_path: Path):


def check_dir_init_test_folders(folders: list[Path]) -> None:
global should_fail
folders = list(folders)
for root_distribution_path in folders:
# We need init folders for all folders and for the common ones we need path extension
Expand All @@ -121,7 +120,6 @@ def check_dir_init_test_folders(folders: list[Path]) -> None:


def check_dir_init_src_folders(folders: list[Path]) -> None:
global should_fail
folders = list(folders)
for root_distribution_path in folders:
# We need init folders for all folders and for the common ones we need path extension
Expand Down