Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def __init__(

def dry_run(self) -> None:
if not AIRFLOW_V_3_0_PLUS:
raise NotImplementedError("Not implemented for Airflow 3.")
raise NotImplementedError("dry_run() is only supported in Airflow 3.0+.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this message is more clear than before. How about ur opinion?

super().dry_run()
sftp_files: list[SftpFile] = self.get_sftp_files_map()
for file in sftp_files:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from airflow.exceptions import AirflowException
from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SftpFile, SFTPToWasbOperator

from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS

TASK_ID = "test-gcs-to-sftp-operator"
WASB_CONN_ID = "wasb_default"
SFTP_CONN_ID = "ssh_default"
Expand Down Expand Up @@ -56,6 +58,64 @@ def test_init(self):
assert operator.blob_prefix == BLOB_PREFIX
assert operator.create_container is False

@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="dry_run only exists in Airflow 3.0+")
@pytest.mark.parametrize(
"move_object, expect_delete_log",
[
(True, True),
(False, False),
],
)
def test_dry_run_logs_and_skips_real_action(self, move_object, expect_delete_log):
operator = SFTPToWasbOperator(
task_id=TASK_ID,
sftp_source_path=SOURCE_PATH_NO_WILDCARD,
sftp_conn_id=SFTP_CONN_ID,
container_name=CONTAINER_NAME,
wasb_conn_id=WASB_CONN_ID,
blob_prefix="sponge-bob/",
move_object=move_object,
)
with (
mock.patch.object(operator.log, "info") as mock_info,
mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook"),
mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook") as mock_sftp_hook,
):
mock_sftp_hook.return_value.get_tree_map.return_value = [
["main_dir/test_object3.json"],
[],
[],
]

operator.dry_run()

logged_messages = [call.args[0] for call in mock_info.call_args_list]
assert "Dry run" in logged_messages
assert "Process will upload file from (SFTP) %s to wasb://%s as %s" in logged_messages

delete_log = "Executing delete of %s"
if expect_delete_log:
assert delete_log in logged_messages
else:
assert delete_log not in logged_messages

@mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.AIRFLOW_V_3_0_PLUS", False)
def test_dry_run_raises_not_implemented(mock_version_tuple):
operator = SFTPToWasbOperator(
task_id="test-task",
sftp_source_path="main_dir/test_*.json",
sftp_conn_id="sftp_default",
container_name="test-container",
wasb_conn_id="wasb_default",
blob_prefix="sponge-bob/",
move_object=False,
)

with pytest.raises(NotImplementedError) as exc_info:
operator.dry_run()

assert "dry_run() is only supported in Airflow 3.0+." in str(exc_info.value)

@mock.patch("airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook", autospec=True)
def test_execute_more_than_one_wildcard_exception(self, mock_hook):
operator = SFTPToWasbOperator(
Expand Down