Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ class GCSToSFTPOperator(BaseOperator):
:param destination_path: The sftp remote path. This is the specified directory path for
uploading to the SFTP server.
:param keep_directory_structure: (Optional) When set to False the path of the file
on the bucket is recreated within path passed in destination_path.
on the bucket is recreated within path passed in destination_path.
:param create_intermediate_dirs: (Optional) When set to True the intermediate directories
in the specified file path will be created.
:param move_object: When move object is True, the object is moved instead
of copied to the new location. This is the equivalent of a mv command
as opposed to a cp command.
Expand Down Expand Up @@ -112,6 +114,7 @@ def __init__(
source_object: str,
destination_path: str,
keep_directory_structure: bool = True,
create_intermediate_dirs: bool = True,
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
sftp_conn_id: str = "ssh_default",
Expand All @@ -124,6 +127,7 @@ def __init__(
self.source_object = source_object
self.destination_path = destination_path
self.keep_directory_structure = keep_directory_structure
self.create_intermediate_dirs = create_intermediate_dirs
self.move_object = move_object
self.gcp_conn_id = gcp_conn_id
self.sftp_conn_id = sftp_conn_id
Expand Down Expand Up @@ -190,7 +194,9 @@ def _copy_single_object(
)

dir_path = os.path.dirname(destination_path)
sftp_hook.create_directory(dir_path)

if self.create_intermediate_dirs:
sftp_hook.create_directory(dir_path)

with NamedTemporaryFile("w") as tmp:
gcs_hook.download(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,49 @@ def test_get_openlineage_facets(
assert result.inputs[0].name == expected_source
assert result.outputs[0].namespace == "file://11.222.33.44:22"
assert result.outputs[0].name == expected_destination

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.GCSHook")
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.SFTPHook")
def test_create_intermediate_dirs_true(self, sftp_hook_mock, gcp_hook_mock):
task = GCSToSFTPOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object="folder/test_object.txt", # Hard-coding
destination_path=DESTINATION_SFTP,
keep_directory_structure=True, # Hard-coding
create_intermediate_dirs=True,
move_object=False,
gcp_conn_id=GCP_CONN_ID,
sftp_conn_id=SFTP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)

assert task.create_intermediate_dirs

task.execute(None)

sftp_hook_mock.return_value.create_directory.assert_called_once_with(
os.path.join(DESTINATION_SFTP, "folder")
)

@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.GCSHook")
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.SFTPHook")
def test_create_intermediate_dirs_false(self, sftp_hook_mock, gcp_hook_mock):
task = GCSToSFTPOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object="folder/test_object.txt", # Hard-coding
destination_path=DESTINATION_SFTP,
keep_directory_structure=True, # Hard-coding
create_intermediate_dirs=False,
move_object=False,
gcp_conn_id=GCP_CONN_ID,
sftp_conn_id=SFTP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)

assert not task.create_intermediate_dirs

task.execute(None)

sftp_hook_mock.return_value.create_directory.assert_not_called()