diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index 92289a89638b4..4a0f078f95206 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -39,6 +39,11 @@ class GCSToS3Operator(BaseOperator): """ Synchronizes a Google Cloud Storage bucket with an S3 bucket. + .. note:: + When flatten_structure=True, it takes precedence over keep_directory_structure. + For example, with flatten_structure=True, "folder/subfolder/file.txt" becomes "file.txt" + regardless of the keep_directory_structure setting. + .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GCSToS3Operator` @@ -79,6 +84,9 @@ class GCSToS3Operator(BaseOperator): object to be uploaded in S3 :param keep_directory_structure: (Optional) When set to False the path of the file on the bucket is recreated within path passed in dest_s3_key. + :param flatten_structure: (Optional) When set to True, places all files directly + in the dest_s3_key directory without preserving subdirectory structure. + Takes precedence over keep_directory_structure when enabled. :param match_glob: (Optional) filters objects based on the glob pattern given by the string (e.g, ``'**/*/.json'``) :param gcp_user_project: (Optional) The identifier of the Google Cloud project to bill for this request. @@ -108,6 +116,7 @@ def __init__( dest_s3_extra_args: dict | None = None, s3_acl_policy: str | None = None, keep_directory_structure: bool = True, + flatten_structure: bool = False, match_glob: str | None = None, gcp_user_project: str | None = None, **kwargs, @@ -124,6 +133,10 @@ def __init__( self.dest_s3_extra_args = dest_s3_extra_args or {} self.s3_acl_policy = s3_acl_policy self.keep_directory_structure = keep_directory_structure + self.flatten_structure = flatten_structure + + if self.flatten_structure and self.keep_directory_structure: + self.log.warning("flatten_structure=True takes precedence over keep_directory_structure=True") try: from airflow.providers.google import __version__ as _GOOGLE_PROVIDER_VERSION @@ -140,6 +153,17 @@ def __init__( self.match_glob = match_glob self.gcp_user_project = gcp_user_project + def _transform_file_path(self, file_path: str) -> str: + """ + Transform the GCS file path according to the specified options. + + :param file_path: The original GCS file path + :return: The transformed file path for S3 destination + """ + if self.flatten_structure: + return os.path.basename(file_path) + return file_path + def execute(self, context: Context) -> list[str]: # list all files in an Google Cloud Storage bucket gcs_hook = GCSHook( @@ -167,7 +191,7 @@ def execute(self, context: Context) -> list[str]: aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify, extra_args=self.dest_s3_extra_args ) - if not self.keep_directory_structure and self.prefix: + if not self.keep_directory_structure and self.prefix and not self.flatten_structure: self.dest_s3_key = os.path.join(self.dest_s3_key, self.prefix) if not self.replace: @@ -187,15 +211,34 @@ def execute(self, context: Context) -> list[str]: existing_files = existing_files or [] # remove the prefix for the existing files to allow the match existing_files = [file.replace(prefix, "", 1) for file in existing_files] - gcs_files = list(set(gcs_files) - set(existing_files)) + + # Transform GCS files for comparison and filter out existing ones + existing_files_set = set(existing_files) + filtered_files = [] + seen_transformed = set() + + for file in gcs_files: + transformed = self._transform_file_path(file) + if transformed not in existing_files_set and transformed not in seen_transformed: + filtered_files.append(file) + seen_transformed.add(transformed) + elif transformed in seen_transformed: + self.log.warning( + "Skipping duplicate file %s (transforms to %s)", + file, + transformed, + ) + + gcs_files = filtered_files if gcs_files: for file in gcs_files: with gcs_hook.provide_file( object_name=file, bucket_name=str(self.gcs_bucket), user_project=self.gcp_user_project ) as local_tmp_file: - dest_key = os.path.join(self.dest_s3_key, file) - self.log.info("Saving file to %s", dest_key) + transformed_path = self._transform_file_path(file) + dest_key = os.path.join(self.dest_s3_key, transformed_path) + self.log.info("Saving file from %s to %s", file, dest_key) s3_hook.load_file( filename=local_tmp_file.name, key=dest_key, diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py index c64df03bc4fae..014f26f494348 100644 --- a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py @@ -330,6 +330,121 @@ def test_execute_without_keep_director_structure(self, mock_hook): assert sorted(MOCK_FILES) == sorted(uploaded_files) assert hook.check_for_prefix(bucket_name="bucket", prefix=PREFIX + "/", delimiter="/") is True + @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") + def test_execute_with_flatten_structure(self, mock_hook): + """Test that flatten_structure parameter flattens directory structure.""" + mock_files_with_paths = ["dir1/subdir1/file1.csv", "dir2/subdir2/file2.csv", "dir3/file3.csv"] + mock_hook.return_value.list.return_value = mock_files_with_paths + + with NamedTemporaryFile() as f: + gcs_provide_file = mock_hook.return_value.provide_file + gcs_provide_file.return_value.__enter__.return_value.name = f.name + + operator = GCSToS3Operator( + task_id=TASK_ID, + gcs_bucket=GCS_BUCKET, + prefix=PREFIX, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=False, + flatten_structure=True, + ) + hook, _ = _create_test_bucket() + + uploaded_files = operator.execute(None) + + # Verify all files were uploaded + assert sorted(mock_files_with_paths) == sorted(uploaded_files) + + # Verify files are stored with flattened structure (only filenames) + expected_s3_keys = ["file1.csv", "file2.csv", "file3.csv"] + actual_keys = hook.list_keys("bucket", delimiter="/") + assert sorted(expected_s3_keys) == sorted(actual_keys) + + @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") + def test_execute_with_flatten_structure_duplicate_filenames(self, mock_hook): + """Test that flatten_structure handles duplicate filenames correctly.""" + mock_files_with_duplicates = [ + "dir1/file.csv", + "dir2/file.csv", # Same filename as above + "dir3/other.csv", + ] + mock_hook.return_value.list.return_value = mock_files_with_duplicates + + with NamedTemporaryFile() as f: + gcs_provide_file = mock_hook.return_value.provide_file + gcs_provide_file.return_value.__enter__.return_value.name = f.name + + operator = GCSToS3Operator( + task_id=TASK_ID, + gcs_bucket=GCS_BUCKET, + prefix=PREFIX, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=False, + flatten_structure=True, + ) + _, _ = _create_test_bucket() + + # Mock the logging to verify warning is logged + mock_path = "airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log" + with mock.patch(mock_path) as mock_log: + uploaded_files = operator.execute(None) + + # Only one of the duplicate files should be uploaded + assert len(uploaded_files) == 2 + assert "dir3/other.csv" in uploaded_files + first_or_second = "dir1/file.csv" in uploaded_files or "dir2/file.csv" in uploaded_files + assert first_or_second + + # Verify warning was logged for duplicate + mock_log.warning.assert_called() + + def test_execute_with_flatten_structure_and_keep_directory_structure_warning(self): + """Test warning when both flatten_structure and keep_directory_structure are True.""" + mock_path = "airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Operator.log" + with mock.patch(mock_path) as mock_log: + GCSToS3Operator( + task_id=TASK_ID, + gcs_bucket=GCS_BUCKET, + prefix=PREFIX, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + flatten_structure=True, + keep_directory_structure=True, # This should trigger warning + ) + + # Verify warning was logged during initialization + expected_warning = "flatten_structure=True takes precedence over keep_directory_structure=True" + mock_log.warning.assert_called_once_with(expected_warning) + + @pytest.mark.parametrize( + ("flatten_structure", "input_path", "expected_output"), + [ + # Tests with flatten_structure=True + (True, "dir1/subdir1/file.csv", "file.csv"), + (True, "path/to/deep/nested/file.txt", "file.txt"), + (True, "simple.txt", "simple.txt"), + (True, "", ""), + # Tests with flatten_structure=False (preserves original paths) + (False, "dir1/subdir1/file.csv", "dir1/subdir1/file.csv"), + (False, "path/to/deep/nested/file.txt", "path/to/deep/nested/file.txt"), + (False, "simple.txt", "simple.txt"), + (False, "", ""), + ], + ) + def test_transform_file_path(self, flatten_structure, input_path, expected_output): + """Test _transform_file_path method with various flatten_structure settings.""" + operator = GCSToS3Operator( + task_id=TASK_ID, + gcs_bucket=GCS_BUCKET, + dest_s3_key=S3_BUCKET, + flatten_structure=flatten_structure, + ) + + result = operator._transform_file_path(input_path) + assert result == expected_output + @pytest.mark.parametrize( ("gcs_prefix", "dest_s3_key", "expected_input", "expected_output"), [