Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class GCSToS3Operator(BaseOperator):
"""
Synchronizes a Google Cloud Storage bucket with an S3 bucket.

.. note::
When flatten_structure=True, it takes precedence over keep_directory_structure.
For example, with flatten_structure=True, "folder/subfolder/file.txt" becomes "file.txt"
regardless of the keep_directory_structure setting.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GCSToS3Operator`
Expand Down Expand Up @@ -79,6 +84,9 @@ class GCSToS3Operator(BaseOperator):
object to be uploaded in S3
:param keep_directory_structure: (Optional) When set to False the path of the file
on the bucket is recreated within path passed in dest_s3_key.
:param flatten_structure: (Optional) When set to True, places all files directly
in the dest_s3_key directory without preserving subdirectory structure.
Takes precedence over keep_directory_structure when enabled.
:param match_glob: (Optional) filters objects based on the glob pattern given by the string
(e.g, ``'**/*/.json'``)
:param gcp_user_project: (Optional) The identifier of the Google Cloud project to bill for this request.
Expand Down Expand Up @@ -108,6 +116,7 @@ def __init__(
dest_s3_extra_args: dict | None = None,
s3_acl_policy: str | None = None,
keep_directory_structure: bool = True,
flatten_structure: bool = False,
match_glob: str | None = None,
gcp_user_project: str | None = None,
**kwargs,
Expand All @@ -124,6 +133,10 @@ def __init__(
self.dest_s3_extra_args = dest_s3_extra_args or {}
self.s3_acl_policy = s3_acl_policy
self.keep_directory_structure = keep_directory_structure
self.flatten_structure = flatten_structure

if self.flatten_structure and self.keep_directory_structure:
self.log.warning("flatten_structure=True takes precedence over keep_directory_structure=True")
try:
from airflow.providers.google import __version__ as _GOOGLE_PROVIDER_VERSION

Expand All @@ -140,6 +153,17 @@ def __init__(
self.match_glob = match_glob
self.gcp_user_project = gcp_user_project

def _transform_file_path(self, file_path: str) -> str:
"""
Transform the GCS file path according to the specified options.

:param file_path: The original GCS file path
:return: The transformed file path for S3 destination
"""
if self.flatten_structure:
return os.path.basename(file_path)
return file_path

def execute(self, context: Context) -> list[str]:
# list all files in an Google Cloud Storage bucket
gcs_hook = GCSHook(
Expand Down Expand Up @@ -167,7 +191,7 @@ def execute(self, context: Context) -> list[str]:
aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify, extra_args=self.dest_s3_extra_args
)

if not self.keep_directory_structure and self.prefix:
if not self.keep_directory_structure and self.prefix and not self.flatten_structure:
self.dest_s3_key = os.path.join(self.dest_s3_key, self.prefix)

if not self.replace:
Expand All @@ -187,15 +211,34 @@ def execute(self, context: Context) -> list[str]:
existing_files = existing_files or []
# remove the prefix for the existing files to allow the match
existing_files = [file.replace(prefix, "", 1) for file in existing_files]
gcs_files = list(set(gcs_files) - set(existing_files))

# Transform GCS files for comparison and filter out existing ones
existing_files_set = set(existing_files)
filtered_files = []
seen_transformed = set()

for file in gcs_files:
transformed = self._transform_file_path(file)
if transformed not in existing_files_set and transformed not in seen_transformed:
filtered_files.append(file)
seen_transformed.add(transformed)
elif transformed in seen_transformed:
self.log.warning(
"Skipping duplicate file %s (transforms to %s)",
file,
transformed,
)

gcs_files = filtered_files

if gcs_files:
for file in gcs_files:
with gcs_hook.provide_file(
object_name=file, bucket_name=str(self.gcs_bucket), user_project=self.gcp_user_project
) as local_tmp_file:
dest_key = os.path.join(self.dest_s3_key, file)
self.log.info("Saving file to %s", dest_key)
transformed_path = self._transform_file_path(file)
dest_key = os.path.join(self.dest_s3_key, transformed_path)
self.log.info("Saving file from %s to %s", file, dest_key)
s3_hook.load_file(
filename=local_tmp_file.name,
key=dest_key,
Expand Down
115 changes: 115 additions & 0 deletions providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,121 @@ def test_execute_without_keep_director_structure(self, mock_hook):
assert sorted(MOCK_FILES) == sorted(uploaded_files)
assert hook.check_for_prefix(bucket_name="bucket", prefix=PREFIX + "/", delimiter="/") is True

@mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
def test_execute_with_flatten_structure(self, mock_hook):
"""Test that flatten_structure parameter flattens directory structure."""
mock_files_with_paths = ["dir1/subdir1/file1.csv", "dir2/subdir2/file2.csv", "dir3/file3.csv"]
mock_hook.return_value.list.return_value = mock_files_with_paths

with NamedTemporaryFile() as f:
gcs_provide_file = mock_hook.return_value.provide_file
gcs_provide_file.return_value.__enter__.return_value.name = f.name

operator = GCSToS3Operator(
task_id=TASK_ID,
gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
dest_aws_conn_id="aws_default",
dest_s3_key=S3_BUCKET,
replace=False,
flatten_structure=True,
)
hook, _ = _create_test_bucket()

uploaded_files = operator.execute(None)

# Verify all files were uploaded
assert sorted(mock_files_with_paths) == sorted(uploaded_files)

# Verify files are stored with flattened structure (only filenames)
expected_s3_keys = ["file1.csv", "file2.csv", "file3.csv"]
actual_keys = hook.list_keys("bucket", delimiter="/")
assert sorted(expected_s3_keys) == sorted(actual_keys)

@mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
def test_execute_with_flatten_structure_duplicate_filenames(self, mock_hook):
"""Test that flatten_structure handles duplicate filenames correctly."""
mock_files_with_duplicates = [
"dir1/file.csv",
"dir2/file.csv", # Same filename as above
"dir3/other.csv",
]
mock_hook.return_value.list.return_value = mock_files_with_duplicates

with NamedTemporaryFile() as f:
gcs_provide_file = mock_hook.return_value.provide_file
gcs_provide_file.return_value.__enter__.return_value.name = f.name

operator = GCSToS3Operator(
task_id=TASK_ID,
gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
dest_aws_conn_id="aws_default",
dest_s3_key=S3_BUCKET,
replace=False,
flatten_structure=True,
)
_, _ = _create_test_bucket()

# Mock the logging to verify warning is logged
mock_path = "airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log"
with mock.patch(mock_path) as mock_log:
uploaded_files = operator.execute(None)

# Only one of the duplicate files should be uploaded
assert len(uploaded_files) == 2
assert "dir3/other.csv" in uploaded_files
first_or_second = "dir1/file.csv" in uploaded_files or "dir2/file.csv" in uploaded_files
assert first_or_second

# Verify warning was logged for duplicate
mock_log.warning.assert_called()

def test_execute_with_flatten_structure_and_keep_directory_structure_warning(self):
"""Test warning when both flatten_structure and keep_directory_structure are True."""
mock_path = "airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log"
with mock.patch(mock_path) as mock_log:
GCSToS3Operator(
task_id=TASK_ID,
gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
dest_aws_conn_id="aws_default",
dest_s3_key=S3_BUCKET,
flatten_structure=True,
keep_directory_structure=True, # This should trigger warning
)

# Verify warning was logged during initialization
expected_warning = "flatten_structure=True takes precedence over keep_directory_structure=True"
mock_log.warning.assert_called_once_with(expected_warning)

@pytest.mark.parametrize(
("flatten_structure", "input_path", "expected_output"),
[
# Tests with flatten_structure=True
(True, "dir1/subdir1/file.csv", "file.csv"),
(True, "path/to/deep/nested/file.txt", "file.txt"),
(True, "simple.txt", "simple.txt"),
(True, "", ""),
# Tests with flatten_structure=False (preserves original paths)
(False, "dir1/subdir1/file.csv", "dir1/subdir1/file.csv"),
(False, "path/to/deep/nested/file.txt", "path/to/deep/nested/file.txt"),
(False, "simple.txt", "simple.txt"),
(False, "", ""),
],
)
def test_transform_file_path(self, flatten_structure, input_path, expected_output):
"""Test _transform_file_path method with various flatten_structure settings."""
operator = GCSToS3Operator(
task_id=TASK_ID,
gcs_bucket=GCS_BUCKET,
dest_s3_key=S3_BUCKET,
flatten_structure=flatten_structure,
)

result = operator._transform_file_path(input_path)
assert result == expected_output

@pytest.mark.parametrize(
("gcs_prefix", "dest_s3_key", "expected_input", "expected_output"),
[
Expand Down