Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ def __init__(
self.chunk_size = chunk_size
self.impersonation_chain = impersonation_chain

def execute(self, context: Context):
"""Upload a file or list of files to Google Cloud Storage."""
def execute(self, context: Context) -> list[str]:
"""
Upload a file or list of files to Google Cloud Storage.

:return: List of destination URIs (gs://bucket/object) for uploaded files.
"""
hook = GCSHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
Expand All @@ -111,6 +115,7 @@ def execute(self, context: Context):
else: # directory is provided
object_paths = [os.path.join(self.dst, os.path.basename(filepath)) for filepath in filepaths]

destination_uris_result: list[str] = []
for filepath, object_path in zip(filepaths, object_paths):
hook.upload(
bucket_name=self.bucket,
Expand All @@ -120,6 +125,14 @@ def execute(self, context: Context):
gzip=self.gzip,
chunk_size=self.chunk_size,
)
destination_uris_result.append(f"gs://{self.bucket}/{object_path}")

# Deduplicate while preserving order. Same destination URI can appear when multiple
# source paths map to one file, e.g. src=["a.png", "a.png"] or src=["data/foo.png", "data/*"]
# with data/ containing only foo.png yields the same object path twice.
destination_uris_result = list(dict.fromkeys(destination_uris_result))

return destination_uris_result

def get_openlineage_facets_on_start(self):
from airflow.providers.common.compat.openlineage.facet import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,44 @@ def test_get_openlineage_facets_on_start_with_list_src(self, src, dst, expected_
assert result.outputs[0].namespace == "gs://dummy"
assert all(inp.name in expected_inputs for inp in result.inputs)
assert all(inp.namespace == "file" for inp in result.inputs)

# Return value tests
@mock.patch("airflow.providers.google.cloud.transfers.local_to_gcs.GCSHook", autospec=True)
def test_execute_returns_list_of_destination_uris_single_file(self, mock_hook):
operator = LocalFilesystemToGCSOperator(
task_id="file_to_gcs_operator",
dag=self.dag,
src=self.testfile1,
dst="test/test1.csv",
**self._config,
)
result = operator.execute(None)
assert result == [f"gs://{self._config['bucket']}/test/test1.csv"]

@mock.patch("airflow.providers.google.cloud.transfers.local_to_gcs.GCSHook", autospec=True)
def test_execute_returns_list_of_destination_uris_multiple_files(self, mock_hook):
operator = LocalFilesystemToGCSOperator(
task_id="file_to_gcs_operator",
dag=self.dag,
src=self.testfiles,
dst="test/",
**self._config,
)
result = operator.execute(None)
expected = [
f"gs://{self._config['bucket']}/test/{os.path.basename(self.testfile1)}",
f"gs://{self._config['bucket']}/test/{os.path.basename(self.testfile2)}",
]
assert result == expected

@mock.patch("airflow.providers.google.cloud.transfers.local_to_gcs.GCSHook", autospec=True)
def test_execute_returns_deduplicated_uris(self, mock_hook):
operator = LocalFilesystemToGCSOperator(
task_id="file_to_gcs_operator",
dag=self.dag,
src=[self.testfile1, self.testfile1],
dst="test/",
**self._config,
)
result = operator.execute(None)
assert result == [f"gs://{self._config['bucket']}/test/{os.path.basename(self.testfile1)}"]