diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py b/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py index 9e53d16f94395..8bd6af7f99105 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py @@ -78,6 +78,8 @@ class SFTPToGCSOperator(BaseOperator): then uploads (may require significant disk space). When ``True``, the file streams directly without using local disk. Defaults to ``False``. + :param fail_on_file_not_exist: If True, operator fails when file does not exist, + if False, operator will not fail and skips transfer. Default is True. """ template_fields: Sequence[str] = ( @@ -101,6 +103,7 @@ def __init__( impersonation_chain: str | Sequence[str] | None = None, sftp_prefetch: bool = True, use_stream: bool = False, + fail_on_file_not_exist: bool = True, **kwargs, ) -> None: super().__init__(**kwargs) @@ -116,6 +119,7 @@ def __init__( self.impersonation_chain = impersonation_chain self.sftp_prefetch = sftp_prefetch self.use_stream = use_stream + self.fail_on_file_not_exist = fail_on_file_not_exist @cached_property def sftp_hook(self): @@ -156,7 +160,13 @@ def execute(self, context: Context): destination_object = ( self.destination_path if self.destination_path else self.source_path.rsplit("/", 1)[1] ) - self._copy_single_object(gcs_hook, self.sftp_hook, self.source_path, destination_object) + try: + self._copy_single_object(gcs_hook, self.sftp_hook, self.source_path, destination_object) + except FileNotFoundError as e: + if self.fail_on_file_not_exist: + raise e + self.log.info("File %s not found on SFTP server. Skipping transfer.", self.source_path) + return def _copy_single_object( self, @@ -172,7 +182,6 @@ def _copy_single_object( self.destination_bucket, destination_object, ) - if self.use_stream: dest_bucket = gcs_hook.get_bucket(self.destination_bucket) dest_blob = dest_bucket.blob(destination_object) diff --git a/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py index f15d1639744b8..d84f889b1e895 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py @@ -19,6 +19,7 @@ import os from unittest import mock +from unittest.mock import patch import pytest @@ -377,3 +378,26 @@ def test_get_openlineage_facets( assert result.inputs[0].name == expected_source assert result.outputs[0].namespace == f"gs://{TEST_BUCKET}" assert result.outputs[0].name == expected_destination + + @pytest.mark.parametrize("fail_on_file_not_exist", [False, True]) + @mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.GCSHook") + @mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.SFTPHook") + def test_sftp_to_gcs_fail_on_file_not_exist(self, sftp_hook, gcs_hook, fail_on_file_not_exist): + invalid_file_name = "main_dir/invalid-object.json" + task = SFTPToGCSOperator( + task_id=TASK_ID, + source_path=invalid_file_name, + destination_bucket=TEST_BUCKET, + destination_path=DESTINATION_PATH_FILE, + move_object=False, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + fail_on_file_not_exist=fail_on_file_not_exist, + ) + with patch.object(sftp_hook.return_value, "retrieve_file", side_effect=FileNotFoundError): + if fail_on_file_not_exist: + with pytest.raises(FileNotFoundError): + task.execute(None) + else: + task.execute(None)