diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/local_to_gcs.py b/providers/google/src/airflow/providers/google/cloud/transfers/local_to_gcs.py index 70134563375f8..17b0d3443eded 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/local_to_gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/local_to_gcs.py @@ -90,8 +90,12 @@ def __init__( self.chunk_size = chunk_size self.impersonation_chain = impersonation_chain - def execute(self, context: Context): - """Upload a file or list of files to Google Cloud Storage.""" + def execute(self, context: Context) -> list[str]: + """ + Upload a file or list of files to Google Cloud Storage. + + :return: List of destination URIs (gs://bucket/object) for uploaded files. + """ hook = GCSHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, @@ -111,6 +115,7 @@ def execute(self, context: Context): else: # directory is provided object_paths = [os.path.join(self.dst, os.path.basename(filepath)) for filepath in filepaths] + destination_uris_result: list[str] = [] for filepath, object_path in zip(filepaths, object_paths): hook.upload( bucket_name=self.bucket, @@ -120,6 +125,14 @@ def execute(self, context: Context): gzip=self.gzip, chunk_size=self.chunk_size, ) + destination_uris_result.append(f"gs://{self.bucket}/{object_path}") + + # Deduplicate while preserving order. Same destination URI can appear when multiple + # source paths map to one file, e.g. src=["a.png", "a.png"] or src=["data/foo.png", "data/*"] + # with data/ containing only foo.png yields the same object path twice. + destination_uris_result = list(dict.fromkeys(destination_uris_result)) + + return destination_uris_result def get_openlineage_facets_on_start(self): from airflow.providers.common.compat.openlineage.facet import ( diff --git a/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py index 4870eff26ca3f..6e32ce62971da 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_local_to_gcs.py @@ -235,3 +235,44 @@ def test_get_openlineage_facets_on_start_with_list_src(self, src, dst, expected_ assert result.outputs[0].namespace == "gs://dummy" assert all(inp.name in expected_inputs for inp in result.inputs) assert all(inp.namespace == "file" for inp in result.inputs) + + # Return value tests + @mock.patch("airflow.providers.google.cloud.transfers.local_to_gcs.GCSHook", autospec=True) + def test_execute_returns_list_of_destination_uris_single_file(self, mock_hook): + operator = LocalFilesystemToGCSOperator( + task_id="file_to_gcs_operator", + dag=self.dag, + src=self.testfile1, + dst="test/test1.csv", + **self._config, + ) + result = operator.execute(None) + assert result == [f"gs://{self._config['bucket']}/test/test1.csv"] + + @mock.patch("airflow.providers.google.cloud.transfers.local_to_gcs.GCSHook", autospec=True) + def test_execute_returns_list_of_destination_uris_multiple_files(self, mock_hook): + operator = LocalFilesystemToGCSOperator( + task_id="file_to_gcs_operator", + dag=self.dag, + src=self.testfiles, + dst="test/", + **self._config, + ) + result = operator.execute(None) + expected = [ + f"gs://{self._config['bucket']}/test/{os.path.basename(self.testfile1)}", + f"gs://{self._config['bucket']}/test/{os.path.basename(self.testfile2)}", + ] + assert result == expected + + @mock.patch("airflow.providers.google.cloud.transfers.local_to_gcs.GCSHook", autospec=True) + def test_execute_returns_deduplicated_uris(self, mock_hook): + operator = LocalFilesystemToGCSOperator( + task_id="file_to_gcs_operator", + dag=self.dag, + src=[self.testfile1, self.testfile1], + dst="test/", + **self._config, + ) + result = operator.execute(None) + assert result == [f"gs://{self._config['bucket']}/test/{os.path.basename(self.testfile1)}"]