diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 9b82e64876ea3..624fb433cdcd4 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -100,7 +100,7 @@ def __init__( def dry_run(self) -> None: if not AIRFLOW_V_3_0_PLUS: - raise NotImplementedError("Not implemented for Airflow 3.") + raise NotImplementedError("dry_run() is only supported in Airflow 3.0+.") super().dry_run() sftp_files: list[SftpFile] = self.get_sftp_files_map() for file in sftp_files: diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py b/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py index a389abf84673b..cb3d85e3501c6 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/transfers/test_sftp_to_wasb.py @@ -24,6 +24,8 @@ from airflow.exceptions import AirflowException from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SftpFile, SFTPToWasbOperator +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS + TASK_ID = "test-gcs-to-sftp-operator" WASB_CONN_ID = "wasb_default" SFTP_CONN_ID = "ssh_default" @@ -56,6 +58,64 @@ def test_init(self): assert operator.blob_prefix == BLOB_PREFIX assert operator.create_container is False + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="dry_run only exists in Airflow 3.0+") + @pytest.mark.parametrize( + "move_object, expect_delete_log", + [ + (True, True), + (False, False), + ], + ) + def test_dry_run_logs_and_skips_real_action(self, move_object, expect_delete_log): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + blob_prefix="sponge-bob/", + move_object=move_object, + ) + with ( + mock.patch.object(operator.log, "info") as mock_info, + mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook"), + mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook") as mock_sftp_hook, + ): + mock_sftp_hook.return_value.get_tree_map.return_value = [ + ["main_dir/test_object3.json"], + [], + [], + ] + + operator.dry_run() + + logged_messages = [call.args[0] for call in mock_info.call_args_list] + assert "Dry run" in logged_messages + assert "Process will upload file from (SFTP) %s to wasb://%s as %s" in logged_messages + + delete_log = "Executing delete of %s" + if expect_delete_log: + assert delete_log in logged_messages + else: + assert delete_log not in logged_messages + + @mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.AIRFLOW_V_3_0_PLUS", False) + def test_dry_run_raises_not_implemented(mock_version_tuple): + operator = SFTPToWasbOperator( + task_id="test-task", + sftp_source_path="main_dir/test_*.json", + sftp_conn_id="sftp_default", + container_name="test-container", + wasb_conn_id="wasb_default", + blob_prefix="sponge-bob/", + move_object=False, + ) + + with pytest.raises(NotImplementedError) as exc_info: + operator.dry_run() + + assert "dry_run() is only supported in Airflow 3.0+." in str(exc_info.value) + @mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook", autospec=True) def test_execute_more_than_one_wildcard_exception(self, mock_hook): operator = SFTPToWasbOperator(