diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index 61e6386cc8f9e..8f6644e09f693 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -31,6 +31,7 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook if TYPE_CHECKING: + from airflow.providers.openlineage.extractors import OperatorLineage from airflow.utils.context import Context @@ -206,3 +207,14 @@ def execute(self, context: Context) -> list[str]: self.log.info("In sync, no files needed to be uploaded to S3") return gcs_files + + def get_openlineage_facets_on_start(self) -> OperatorLineage: + from airflow.providers.common.compat.openlineage.facet import Dataset + from airflow.providers.openlineage.extractors import OperatorLineage + + bucket_name, s3_key = S3Hook.parse_s3_url(self.dest_s3_key) + + return OperatorLineage( + inputs=[Dataset(namespace=f"gs://{self.gcs_bucket}", name=self.prefix or "/")], + outputs=[Dataset(namespace=f"s3://{bucket_name}", name=s3_key or "/")], + ) diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py index 50204a0251a8d..c64df03bc4fae 100644 --- a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py @@ -329,3 +329,33 @@ def test_execute_without_keep_director_structure(self, mock_hook): uploaded_files = operator.execute(None) assert sorted(MOCK_FILES) == sorted(uploaded_files) assert hook.check_for_prefix(bucket_name="bucket", prefix=PREFIX + "/", delimiter="/") is True + + @pytest.mark.parametrize( + ("gcs_prefix", "dest_s3_key", "expected_input", "expected_output"), + [ + ("dir/pre", "s3://bucket/dest_dir/", "dir/pre", "dest_dir/"), + ("dir/pre", "s3://bucket/dest_dir", "dir/pre", "dest_dir"), + ("dir/pre/", "s3://bucket/dest_dir/", "dir/pre/", "dest_dir/"), + ("dir/pre", "s3://bucket/", "dir/pre", "/"), + ("dir/pre", "s3://bucket", "dir/pre", "/"), + ("", "s3://bucket/", "/", "/"), + ("", "s3://bucket", "/", "/"), + ], + ) + def test_get_openlineage_facets_on_start(self, gcs_prefix, dest_s3_key, expected_input, expected_output): + operator = GCSToS3Operator( + task_id=TASK_ID, + gcs_bucket=GCS_BUCKET, + prefix=gcs_prefix, + dest_s3_key=dest_s3_key, + ) + + result = operator.get_openlineage_facets_on_start() + assert not result.job_facets + assert not result.run_facets + assert len(result.outputs) == 1 + assert len(result.inputs) == 1 + assert result.outputs[0].namespace == S3_BUCKET.rstrip("/") + assert result.outputs[0].name == expected_output + assert result.inputs[0].namespace == f"gs://{GCS_BUCKET}" + assert result.inputs[0].name == expected_input