Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class SFTPToGCSOperator(BaseOperator):
then uploads (may require significant disk space).
When ``True``, the file streams directly without using local disk.
Defaults to ``False``.
:param fail_on_file_not_exist: If True, operator fails when file does not exist,
if False, operator will not fail and skips transfer. Default is True.
"""

template_fields: Sequence[str] = (
Expand All @@ -101,6 +103,7 @@ def __init__(
impersonation_chain: str | Sequence[str] | None = None,
sftp_prefetch: bool = True,
use_stream: bool = False,
fail_on_file_not_exist: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -116,6 +119,7 @@ def __init__(
self.impersonation_chain = impersonation_chain
self.sftp_prefetch = sftp_prefetch
self.use_stream = use_stream
self.fail_on_file_not_exist = fail_on_file_not_exist

@cached_property
def sftp_hook(self):
Expand Down Expand Up @@ -156,7 +160,13 @@ def execute(self, context: Context):
destination_object = (
self.destination_path if self.destination_path else self.source_path.rsplit("/", 1)[1]
)
self._copy_single_object(gcs_hook, self.sftp_hook, self.source_path, destination_object)
try:
self._copy_single_object(gcs_hook, self.sftp_hook, self.source_path, destination_object)
except FileNotFoundError as e:
if self.fail_on_file_not_exist:
raise e
self.log.info("File %s not found on SFTP server. Skipping transfer.", self.source_path)
return

def _copy_single_object(
self,
Expand All @@ -172,7 +182,6 @@ def _copy_single_object(
self.destination_bucket,
destination_object,
)

if self.use_stream:
dest_bucket = gcs_hook.get_bucket(self.destination_bucket)
dest_blob = dest_bucket.blob(destination_object)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import os
from unittest import mock
from unittest.mock import patch

import pytest

Expand Down Expand Up @@ -377,3 +378,26 @@ def test_get_openlineage_facets(
assert result.inputs[0].name == expected_source
assert result.outputs[0].namespace == f"gs://{TEST_BUCKET}"
assert result.outputs[0].name == expected_destination

@pytest.mark.parametrize("fail_on_file_not_exist", [False, True])
@mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.GCSHook")
@mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.SFTPHook")
def test_sftp_to_gcs_fail_on_file_not_exist(self, sftp_hook, gcs_hook, fail_on_file_not_exist):
invalid_file_name = "main_dir/invalid-object.json"
task = SFTPToGCSOperator(
task_id=TASK_ID,
source_path=invalid_file_name,
destination_bucket=TEST_BUCKET,
destination_path=DESTINATION_PATH_FILE,
move_object=False,
gcp_conn_id=GCP_CONN_ID,
sftp_conn_id=SFTP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
fail_on_file_not_exist=fail_on_file_not_exist,
)
with patch.object(sftp_hook.return_value, "retrieve_file", side_effect=FileNotFoundError):
if fail_on_file_not_exist:
with pytest.raises(FileNotFoundError):
task.execute(None)
else:
task.execute(None)