diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/local_to_gcs.py b/providers/google/src/airflow/providers/google/cloud/transfers/local_to_gcs.py index 70134563375f8..537591695b567 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/local_to_gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/local_to_gcs.py @@ -57,6 +57,7 @@ class LocalFilesystemToGCSOperator(BaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + :return: List of URIs for the objects created in Google Cloud Storage """ template_fields: Sequence[str] = ( @@ -121,6 +122,8 @@ def execute(self, context: Context): chunk_size=self.chunk_size, ) + return [f"gs://{self.bucket}/{object_path}" for object_path in object_paths] + def get_openlineage_facets_on_start(self): from airflow.providers.common.compat.openlineage.facet import ( Dataset, diff --git a/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py index 4870eff26ca3f..f5d8a917219f8 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py @@ -83,7 +83,7 @@ def test_execute(self, mock_hook): dst="test/test1.csv", **self._config, ) - operator.execute(None) + result = operator.execute(None) mock_instance.upload.assert_called_once_with( bucket_name=self._config["bucket"], filename=self.testfile1, @@ -92,6 +92,7 @@ def test_execute(self, mock_hook): object_name="test/test1.csv", chunk_size=self._config["chunk_size"], ) + assert result == ["gs://dummy/test/test1.csv"] @pytest.mark.db_test def test_execute_with_empty_src(self): @@ -111,7 +112,7 @@ def test_execute_multiple(self, mock_hook): operator = LocalFilesystemToGCSOperator( task_id="file_to_gcs_operator", dag=self.dag, src=self.testfiles, dst="test/", **self._config ) - operator.execute(None) + result = operator.execute(None) files_objects = zip( self.testfiles, ["test/" + os.path.basename(testfile) for testfile in self.testfiles] ) @@ -127,6 +128,7 @@ def test_execute_multiple(self, mock_hook): for filepath, object_name in files_objects ] mock_instance.upload.assert_has_calls(calls) + assert set(result) == {"gs://dummy/test/fake1.csv", "gs://dummy/test/fake2.csv"} @mock.patch("airflow.providers.google.cloud.transfers.local_to_gcs.GCSHook", autospec=True) def test_execute_wildcard(self, mock_hook): @@ -138,7 +140,7 @@ def test_execute_wildcard(self, mock_hook): dst="test/", **self._config, ) - operator.execute(None) + result = operator.execute(None) object_names = ["test/" + os.path.basename(fp) for fp in glob(f"{self.tmpdir_posix}/fake*.csv")] files_objects = zip(glob(f"{self.tmpdir_posix}/fake*.csv"), object_names) calls = [ @@ -153,6 +155,7 @@ def test_execute_wildcard(self, mock_hook): for filepath, object_name in files_objects ] mock_instance.upload.assert_has_calls(calls) + assert set(result) == {"gs://dummy/test/fake1.csv", "gs://dummy/test/fake2.csv"} @pytest.mark.parametrize( ("src", "dst"),