diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/__init__.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py new file mode 100644 index 0000000000000..3752705c7aac2 --- /dev/null +++ b/providers/amazon/src/airflow/providers/amazon/aws/bundles/s3.py @@ -0,0 +1,152 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from pathlib import Path + +import structlog + +from airflow.dag_processing.bundles.base import BaseDagBundle +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + + +class S3DagBundle(BaseDagBundle): + """ + S3 DAG bundle - exposes a directory in S3 as a DAG bundle. + + This allows Airflow to load DAGs directly from an S3 bucket. + + :param aws_conn_id: Airflow connection ID for AWS. Defaults to AwsBaseHook.default_conn_name. + :param bucket_name: The name of the S3 bucket containing the DAG files. + :param prefix: Optional subdirectory within the S3 bucket where the DAGs are stored. + If None, DAGs are assumed to be at the root of the bucket (Optional). + """ + + supports_versioning = False + + def __init__( + self, + *, + aws_conn_id: str = AwsBaseHook.default_conn_name, + bucket_name: str, + prefix: str = "", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.aws_conn_id = aws_conn_id + self.bucket_name = bucket_name + self.prefix = prefix + # Local path where S3 DAGs are downloaded + self.s3_dags_dir: Path = self.base_dir + + log = structlog.get_logger(__name__) + self._log = log.bind( + bundle_name=self.name, + version=self.version, + bucket_name=self.bucket_name, + prefix=self.prefix, + aws_conn_id=self.aws_conn_id, + ) + self._s3_hook: S3Hook | None = None + + def _initialize(self): + with self.lock(): + if not self.s3_dags_dir.exists(): + self._log.info("Creating local DAGs directory: %s", self.s3_dags_dir) + os.makedirs(self.s3_dags_dir) + + if not self.s3_dags_dir.is_dir(): + raise AirflowException(f"Local DAGs path: {self.s3_dags_dir} is not a directory.") + + if not self.s3_hook.check_for_bucket(bucket_name=self.bucket_name): + raise AirflowException(f"S3 bucket '{self.bucket_name}' does not exist.") + + if self.prefix: + # don't check when prefix is "" + if not self.s3_hook.check_for_prefix( + bucket_name=self.bucket_name, prefix=self.prefix, delimiter="/" + ): + raise AirflowException( + f"S3 prefix 's3://{self.bucket_name}/{self.prefix}' does not exist." + ) + self.refresh() + + def initialize(self) -> None: + self._initialize() + super().initialize() + + @property + def s3_hook(self): + if self._s3_hook is None: + try: + self._s3_hook: S3Hook = S3Hook(aws_conn_id=self.aws_conn_id) # Initialize S3 hook. + except AirflowException as e: + self._log.warning("Could not create S3Hook for connection %s: %s", self.aws_conn_id, e) + return self._s3_hook + + def __repr__(self): + return ( + f"" + ) + + def get_current_version(self) -> str | None: + """Return the current version of the DAG bundle. Currently not supported.""" + return None + + @property + def path(self) -> Path: + """Return the local path to the DAG files.""" + return self.s3_dags_dir # Path where DAGs are downloaded. + + def refresh(self) -> None: + """Refresh the DAG bundle by re-downloading the DAGs from S3.""" + if self.version: + raise AirflowException("Refreshing a specific version is not supported") + + with self.lock(): + self._log.debug( + "Downloading DAGs from s3://%s/%s to %s", self.bucket_name, self.prefix, self.s3_dags_dir + ) + self.s3_hook.sync_to_local_dir( + bucket_name=self.bucket_name, + s3_prefix=self.prefix, + local_dir=self.s3_dags_dir, + delete_stale=True, + ) + + def view_url(self, version: str | None = None) -> str | None: + """Return a URL for viewing the DAGs in S3. Currently, versioning is not supported.""" + if self.version: + raise AirflowException("S3 url with version is not supported") + + # https://.s3..amazonaws.com/ + url = f"https://{self.bucket_name}.s3" + if self.s3_hook.region_name: + url += f".{self.s3_hook.region_name}" + url += ".amazonaws.com" + if self.prefix: + url += f"/{self.prefix}" + + return url diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py index 578ba2a2c4646..dd28e410b1730 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py @@ -1683,3 +1683,80 @@ def delete_bucket_tagging(self, bucket_name: str | None = None) -> None: """ s3_client = self.get_conn() s3_client.delete_bucket_tagging(Bucket=bucket_name) + + def _sync_to_local_dir_delete_stale_local_files(self, current_s3_objects: list[Path], local_dir: Path): + current_s3_keys = {key for key in current_s3_objects} + + for item in local_dir.iterdir(): + item: Path # type: ignore[no-redef] + absolute_item_path = item.resolve() + + if absolute_item_path not in current_s3_keys: + try: + if item.is_file(): + item.unlink(missing_ok=True) + self.log.debug("Deleted stale local file: %s", item) + elif item.is_dir(): + # delete only when the folder is empty + if not os.listdir(item): + item.rmdir() + self.log.debug("Deleted stale empty directory: %s", item) + else: + self.log.debug("Skipping stale item of unknown type: %s", item) + except OSError as e: + self.log.error("Error deleting stale item %s: %s", item, e) + raise e + + def _sync_to_local_dir_if_changed(self, s3_bucket, s3_object, local_target_path: Path): + should_download = False + download_msg = "" + if not local_target_path.exists(): + should_download = True + download_msg = f"Local file {local_target_path} does not exist." + else: + local_stats = local_target_path.stat() + + if s3_object.size != local_stats.st_size: + should_download = True + download_msg = ( + f"S3 object size ({s3_object.size}) and local file size ({local_stats.st_size}) differ." + ) + + s3_last_modified = s3_object.last_modified + if local_stats.st_mtime < s3_last_modified.microsecond: + should_download = True + download_msg = f"S3 object last modified ({s3_last_modified.microsecond}) and local file last modified ({local_stats.st_mtime}) differ." + + if should_download: + s3_bucket.download_file(s3_object.key, local_target_path) + self.log.debug( + "%s Downloaded %s to %s", download_msg, s3_object.key, local_target_path.as_posix() + ) + else: + self.log.debug( + "Local file %s is up-to-date with S3 object %s. Skipping download.", + local_target_path.as_posix(), + s3_object.key, + ) + + def sync_to_local_dir(self, bucket_name: str, local_dir: Path, s3_prefix="", delete_stale: bool = True): + """Download S3 files from the S3 bucket to the local directory.""" + self.log.debug("Downloading data from s3://%s/%s to %s", bucket_name, s3_prefix, local_dir) + + local_s3_objects = [] + s3_bucket = self.get_bucket(bucket_name) + for obj in s3_bucket.objects.filter(Prefix=s3_prefix): + obj_path = Path(obj.key) + local_target_path = local_dir.joinpath(obj_path.relative_to(s3_prefix)) + if not local_target_path.parent.exists(): + local_target_path.parent.mkdir(parents=True, exist_ok=True) + self.log.debug("Created local directory: %s", local_target_path.parent) + self._sync_to_local_dir_if_changed( + s3_bucket=s3_bucket, s3_object=obj, local_target_path=local_target_path + ) + local_s3_objects.append(local_target_path) + + if delete_stale: + self._sync_to_local_dir_delete_stale_local_files( + current_s3_objects=local_s3_objects, local_dir=local_dir + ) diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/__init__.py b/providers/amazon/tests/unit/amazon/aws/bundles/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/amazon/tests/unit/amazon/aws/bundles/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py new file mode 100644 index 0000000000000..e801a6a9259b1 --- /dev/null +++ b/providers/amazon/tests/unit/amazon/aws/bundles/test_s3.py @@ -0,0 +1,227 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from unittest.mock import MagicMock, call + +import boto3 +import pytest +from moto import mock_aws + +import airflow.version +from airflow.exceptions import AirflowException +from airflow.models import Connection +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils import db + +from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.db import clear_db_connections + +AWS_CONN_ID_WITH_REGION = "s3_dags_connection" +AWS_CONN_ID_REGION = "eu-central-1" +AWS_CONN_ID_DEFAULT = "aws_default" +S3_BUCKET_NAME = "my-airflow-dags-bucket" +S3_BUCKET_PREFIX = "project1/dags" + +if airflow.version.version.strip().startswith("3"): + from airflow.providers.amazon.aws.bundles.s3 import S3DagBundle + + +@pytest.fixture +def mocked_s3_resource(): + with mock_aws(): + yield boto3.resource("s3") + + +@pytest.fixture +def s3_client(): + with mock_aws(): + yield boto3.client("s3") + + +@pytest.fixture +def s3_bucket(mocked_s3_resource, s3_client): + bucket = mocked_s3_resource.create_bucket(Bucket=S3_BUCKET_NAME) + + s3_client.put_object(Bucket=bucket.name, Key=S3_BUCKET_PREFIX + "/dag_01.py", Body=b"test data") + s3_client.put_object(Bucket=bucket.name, Key=S3_BUCKET_PREFIX + "/dag_02.py", Body=b"test data") + s3_client.put_object( + Bucket=bucket.name, Key=S3_BUCKET_PREFIX + "/subproject1/dag_a.py", Body=b"test data" + ) + s3_client.put_object( + Bucket=bucket.name, Key=S3_BUCKET_PREFIX + "/subproject1/dag_b.py", Body=b"test data" + ) + + return bucket + + +@pytest.fixture(autouse=True) +def bundle_temp_dir(tmp_path): + with conf_vars({("dag_processor", "dag_bundle_storage_path"): str(tmp_path)}): + yield tmp_path + + +@pytest.mark.skipif(not airflow.version.version.strip().startswith("3"), reason="Airflow >=3.0.0 test") +class TestS3DagBundle: + @classmethod + def teardown_class(cls) -> None: + clear_db_connections() + + @classmethod + def setup_class(cls) -> None: + db.merge_conn( + Connection( + conn_id=AWS_CONN_ID_DEFAULT, + conn_type="aws", + extra={ + "config_kwargs": {"s3": {"bucket_name": S3_BUCKET_NAME}}, + }, + ) + ) + db.merge_conn( + conn=Connection( + conn_id=AWS_CONN_ID_WITH_REGION, + conn_type="aws", + extra={ + "config_kwargs": {"s3": {"bucket_name": S3_BUCKET_NAME}}, + "region_name": AWS_CONN_ID_REGION, + }, + ) + ) + + @pytest.mark.db_test + def test_view_url_generates_presigned_url(self): + bundle = S3DagBundle( + name="test", aws_conn_id=AWS_CONN_ID_DEFAULT, prefix="project1/dags", bucket_name=S3_BUCKET_NAME + ) + url: str = bundle.view_url("test_version") + assert url.startswith("https://my-airflow-dags-bucket.s3.amazonaws.com/project1/dags") + + @pytest.mark.db_test + def test_supports_versioning(self): + bundle = S3DagBundle( + name="test", aws_conn_id=AWS_CONN_ID_DEFAULT, prefix="project1/dags", bucket_name=S3_BUCKET_NAME + ) + assert S3DagBundle.supports_versioning is False + + # set version, it's not supported + bundle.version = "test_version" + + with pytest.raises(AirflowException, match="Refreshing a specific version is not supported"): + bundle.refresh() + with pytest.raises(AirflowException, match="S3 url with version is not supported"): + bundle.view_url("test_version") + + @pytest.mark.db_test + def test_correct_bundle_path_used(self): + bundle = S3DagBundle( + name="test", aws_conn_id=AWS_CONN_ID_DEFAULT, prefix="project1_dags", bucket_name="aiflow_dags" + ) + assert str(bundle.base_dir) == str(bundle.s3_dags_dir) + + @pytest.mark.db_test + def test_s3_bucket_and_prefix_validated(self, s3_bucket): + hook = S3Hook(aws_conn_id=AWS_CONN_ID_DEFAULT) + assert hook.check_for_bucket(s3_bucket.name) is True + + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + prefix="project1_dags", + bucket_name="non-existing-bucket", + ) + with pytest.raises(AirflowException, match="S3 bucket.*non-existing-bucket.*does not exist.*"): + bundle.initialize() + + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + prefix="non-existing-prefix", + bucket_name=S3_BUCKET_NAME, + ) + with pytest.raises(AirflowException, match="S3 prefix.*non-existing-prefix.*does not exist.*"): + bundle.initialize() + + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + prefix=S3_BUCKET_PREFIX, + bucket_name=S3_BUCKET_NAME, + ) + # initialize succeeds, with correct prefix and bucket + bundle.initialize() + assert bundle.s3_hook.region_name == AWS_CONN_ID_REGION + + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + prefix="", + bucket_name=S3_BUCKET_NAME, + ) + # initialize succeeds, with empty prefix + bundle.initialize() + assert bundle.s3_hook.region_name == AWS_CONN_ID_REGION + + def _upload_fixtures(self, bucket: str, fixtures_dir: str) -> None: + client = boto3.client("s3") + fixtures_paths = [ + os.path.join(path, filename) for path, _, files in os.walk(fixtures_dir) for filename in files + ] + for path in fixtures_paths: + key = os.path.relpath(path, fixtures_dir) + client.upload_file(Filename=path, Bucket=bucket, Key=key) + + @pytest.mark.db_test + def test_refresh(self, s3_bucket, s3_client): + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + prefix=S3_BUCKET_PREFIX, + bucket_name=S3_BUCKET_NAME, + ) + bundle._log.debug = MagicMock() + # Create a pytest Call object to compare against the call_args_list of the _log.debug mock + download_log_call = call( + "Downloading DAGs from s3://%s/%s to %s", S3_BUCKET_NAME, S3_BUCKET_PREFIX, bundle.s3_dags_dir + ) + bundle.initialize() + assert bundle._log.debug.call_count == 1 + assert bundle._log.debug.call_args_list == [download_log_call] + bundle.refresh() + assert bundle._log.debug.call_count == 2 + assert bundle._log.debug.call_args_list == [download_log_call, download_log_call] + bundle.refresh() + assert bundle._log.debug.call_count == 3 + assert bundle._log.debug.call_args_list == [download_log_call, download_log_call, download_log_call] + + @pytest.mark.db_test + def test_refresh_without_prefix(self, s3_bucket, s3_client): + bundle = S3DagBundle( + name="test", + aws_conn_id=AWS_CONN_ID_WITH_REGION, + bucket_name=S3_BUCKET_NAME, + ) + bundle._log.debug = MagicMock() + download_log_call = call( + "Downloading DAGs from s3://%s/%s to %s", S3_BUCKET_NAME, "", bundle.s3_dags_dir + ) + assert bundle.prefix == "" + bundle.initialize() + bundle.refresh() + assert bundle._log.debug.call_count == 2 + assert bundle._log.debug.call_args_list == [download_log_call, download_log_call] diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py index 31b3a625a3971..2d7552df75f49 100644 --- a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py @@ -22,6 +22,7 @@ import os import re from datetime import datetime as std_datetime, timezone +from pathlib import Path from unittest import mock, mock as async_mock from unittest.mock import AsyncMock, MagicMock, Mock, patch from urllib.parse import parse_qs @@ -43,6 +44,16 @@ ) from airflow.utils.timezone import datetime +try: + import importlib.util + + if not importlib.util.find_spec("airflow.sdk.bases.hook"): + raise ImportError + + BASEHOOK_PATCH_PATH = "airflow.sdk.bases.hook.BaseHook" +except ImportError: + BASEHOOK_PATCH_PATH = "airflow.hooks.base.BaseHook" + @pytest.fixture def mocked_s3_res(): @@ -50,6 +61,12 @@ def mocked_s3_res(): yield boto3.resource("s3") +@pytest.fixture +def s3_client(): + with mock_aws(): + yield boto3.client("s3") + + @pytest.fixture def s3_bucket(mocked_s3_res): bucket = "airflow-test-s3-bucket" @@ -1726,6 +1743,64 @@ def test_delete_bucket_tagging_with_no_tags(self): with pytest.raises(ClientError, match=r".*NoSuchTagSet.*"): hook.get_bucket_tagging(bucket_name="new_bucket") + def test_sync_to_local_dir_behaviour(self, s3_bucket, s3_client, tmp_path): + def get_logs_string(call_args_list): + return "".join([args[0][0] % args[0][1:] for args in call_args_list]) + + s3_client.put_object(Bucket=s3_bucket, Key="dag_01.py", Body=b"test data") + s3_client.put_object(Bucket=s3_bucket, Key="dag_02.py", Body=b"test data") + s3_client.put_object(Bucket=s3_bucket, Key="subproject1/dag_a.py", Body=b"test data") + s3_client.put_object(Bucket=s3_bucket, Key="subproject1/dag_b.py", Body=b"test data") + + sync_local_dir = tmp_path / "s3_sync_dir" + hook = S3Hook() + hook.log.debug = MagicMock() + hook.sync_to_local_dir( + bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True + ) + logs_string = get_logs_string(hook.log.debug.call_args_list) + assert f"Downloading data from s3://{s3_bucket}" in logs_string + assert f"does not exist. Downloaded dag_01.py to {sync_local_dir}/dag_01.py" in logs_string + assert "does not exist. Downloaded dag_01.py to" in logs_string + assert f"does not exist. Downloaded subproject1/dag_a.py to {sync_local_dir}" in logs_string + + # add new file to bucket and sync + hook.log.debug = MagicMock() + s3_client.put_object(Bucket=s3_bucket, Key="dag_03.py", Body=b"test data") + hook.sync_to_local_dir( + bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True + ) + logs_string = get_logs_string(hook.log.debug.call_args_list) + assert ( + "subproject1/dag_b.py is up-to-date with S3 object subproject1/dag_b.py. Skipping download" + in logs_string + ) + assert f"does not exist. Downloaded dag_03.py to {sync_local_dir}/dag_03.py" in logs_string + # read that file is donloaded and has same content + assert Path(sync_local_dir).joinpath("dag_03.py").read_text() == "test data" + + local_file_that_should_be_deleted = Path(sync_local_dir).joinpath("file_that_should_be_deleted.py") + local_file_that_should_be_deleted.write_text("test dag") + local_folder_should_be_deleted = Path(sync_local_dir).joinpath("local_folder_should_be_deleted") + local_folder_should_be_deleted.mkdir(exist_ok=True) + hook.log.debug = MagicMock() + hook.sync_to_local_dir( + bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True + ) + logs_string = get_logs_string(hook.log.debug.call_args_list) + assert f"Deleted stale local file: {local_file_that_should_be_deleted.as_posix()}" in logs_string + + assert f"Deleted stale empty directory: {local_folder_should_be_deleted.as_posix()}" in logs_string + + s3_client.put_object(Bucket=s3_bucket, Key="dag_03.py", Body=b"test data-changed") + hook.log.debug = MagicMock() + hook.sync_to_local_dir( + bucket_name=s3_bucket, local_dir=sync_local_dir, s3_prefix="", delete_stale=True + ) + logs_string = get_logs_string(hook.log.debug.call_args_list) + assert "S3 object size" in logs_string + assert "differ. Downloaded dag_03.py to" in logs_string + @pytest.mark.parametrize( "key_kind, has_conn, has_bucket, precedence, expected", @@ -1748,7 +1823,7 @@ def test_delete_bucket_tagging_with_no_tags(self): ("rel_key", "with_conn", "with_bucket", "provide", ["kwargs_bucket", "key.txt"]), ], ) -@patch("airflow.hooks.base.BaseHook.get_connection") +@patch(f"{BASEHOOK_PATCH_PATH}.get_connection") def test_unify_and_provide_bucket_name_combination( mock_base, key_kind, has_conn, has_bucket, precedence, expected, caplog ): @@ -1811,7 +1886,7 @@ def do_something(self, bucket_name=None, key=None): ("rel_key", "with_conn", "with_bucket", ["kwargs_bucket", "key.txt"]), ], ) -@patch("airflow.hooks.base.BaseHook.get_connection") +@patch(f"{BASEHOOK_PATCH_PATH}.get_connection") def test_s3_head_object_decorated_behavior(mock_conn, has_conn, has_bucket, key_kind, expected): if has_conn == "with_conn": c = Connection(extra={"service_config": {"s3": {"bucket_name": "conn_bucket"}}})