From 9cfbb58a4141a0e2d887780f33537e4a239cd3f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Grochowicz?= Date: Fri, 8 Aug 2025 14:42:48 +0200 Subject: [PATCH 1/3] feat: Add OpenLineage support for transfer operators between GCS and S3 --- .../amazon/aws/transfers/gcs_to_s3.py | 12 ++++++++ .../amazon/aws/transfers/test_gcs_to_s3.py | 30 +++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index 61e6386cc8f9e..14df002ce2e37 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -206,3 +206,15 @@ def execute(self, context: Context) -> list[str]: self.log.info("In sync, no files needed to be uploaded to S3") return gcs_files + + def get_openlineage_facets_on_start(self): + from airflow.providers.amazon.aws.hooks.s3 import S3Hook + from airflow.providers.common.compat.openlineage.facet import Dataset + from airflow.providers.openlineage.extractors import OperatorLineage + + aws_bucket_name, aws_prefix = S3Hook.parse_s3_url(self.dest_s3_key) + + return OperatorLineage( + inputs=[Dataset(namespace=f"gs://{self.gcs_bucket}", name=self.prefix or "/")], + outputs=[Dataset(namespace=f"s3://{aws_bucket_name}", name=aws_prefix or "/")], + ) diff --git a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py index 50204a0251a8d..c64df03bc4fae 100644 --- a/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/transfers/test_gcs_to_s3.py @@ -329,3 +329,33 @@ def test_execute_without_keep_director_structure(self, mock_hook): uploaded_files = operator.execute(None) assert sorted(MOCK_FILES) == sorted(uploaded_files) assert hook.check_for_prefix(bucket_name="bucket", prefix=PREFIX + "/", delimiter="/") is True + + @pytest.mark.parametrize( + ("gcs_prefix", "dest_s3_key", "expected_input", "expected_output"), + [ + ("dir/pre", "s3://bucket/dest_dir/", "dir/pre", "dest_dir/"), + ("dir/pre", "s3://bucket/dest_dir", "dir/pre", "dest_dir"), + ("dir/pre/", "s3://bucket/dest_dir/", "dir/pre/", "dest_dir/"), + ("dir/pre", "s3://bucket/", "dir/pre", "/"), + ("dir/pre", "s3://bucket", "dir/pre", "/"), + ("", "s3://bucket/", "/", "/"), + ("", "s3://bucket", "/", "/"), + ], + ) + def test_get_openlineage_facets_on_start(self, gcs_prefix, dest_s3_key, expected_input, expected_output): + operator = GCSToS3Operator( + task_id=TASK_ID, + gcs_bucket=GCS_BUCKET, + prefix=gcs_prefix, + dest_s3_key=dest_s3_key, + ) + + result = operator.get_openlineage_facets_on_start() + assert not result.job_facets + assert not result.run_facets + assert len(result.outputs) == 1 + assert len(result.inputs) == 1 + assert result.outputs[0].namespace == S3_BUCKET.rstrip("/") + assert result.outputs[0].name == expected_output + assert result.inputs[0].namespace == f"gs://{GCS_BUCKET}" + assert result.inputs[0].name == expected_input From e956be5f94397fe1a132f0ee5f635b6ca7ac0426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Grochowicz?= Date: Mon, 18 Aug 2025 10:04:41 +0200 Subject: [PATCH 2/3] feat: Add OpenLineage support for transfer operators between GCS and S3 --- .../airflow/providers/amazon/aws/transfers/gcs_to_s3.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index 14df002ce2e37..c5247fe5b9880 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -32,6 +32,7 @@ if TYPE_CHECKING: from airflow.utils.context import Context + from airflow.providers.openlineage.extractors import OperatorLineage class GCSToS3Operator(BaseOperator): @@ -207,14 +208,13 @@ def execute(self, context: Context) -> list[str]: return gcs_files - def get_openlineage_facets_on_start(self): - from airflow.providers.amazon.aws.hooks.s3 import S3Hook + def get_openlineage_facets_on_start(self) -> OperatorLineage: from airflow.providers.common.compat.openlineage.facet import Dataset from airflow.providers.openlineage.extractors import OperatorLineage - aws_bucket_name, aws_prefix = S3Hook.parse_s3_url(self.dest_s3_key) + bucket_name, s3_key = S3Hook.parse_s3_url(self.dest_s3_key) return OperatorLineage( inputs=[Dataset(namespace=f"gs://{self.gcs_bucket}", name=self.prefix or "/")], - outputs=[Dataset(namespace=f"s3://{aws_bucket_name}", name=aws_prefix or "/")], + outputs=[Dataset(namespace=f"s3://{bucket_name}", name=s3_key or "/")], ) From 865a752932ee58b81b05fd3ddf88f376d53b6883 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Grochowicz?= Date: Mon, 18 Aug 2025 11:18:57 +0200 Subject: [PATCH 3/3] feat: Add OpenLineage support for transfer operators between GCS and S3 --- .../src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index c5247fe5b9880..8f6644e09f693 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -31,8 +31,8 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook if TYPE_CHECKING: - from airflow.utils.context import Context from airflow.providers.openlineage.extractors import OperatorLineage + from airflow.utils.context import Context class GCSToS3Operator(BaseOperator):