diff --git a/providers/google/src/airflow/providers/google/cloud/openlineage/CloudStorageTransferJobFacet.json b/providers/google/src/airflow/providers/google/cloud/openlineage/CloudStorageTransferJobFacet.json new file mode 100644 index 0000000000000..2f0f763af2f5c --- /dev/null +++ b/providers/google/src/airflow/providers/google/cloud/openlineage/CloudStorageTransferJobFacet.json @@ -0,0 +1,68 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$defs": { + "CloudStorageTransferJobFacet": { + "allOf": [ + { + "$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/JobFacet" + }, + { + "type": "object", + "properties": { + "jobName": { + "type": "string", + "description": "Transfer job name assigned by GCP Storage Transfer Service." + }, + "projectId": { + "type": "string", + "description": "GCP project ID." + }, + "description": { + "type": "string", + "description": "Optional description of the transfer job." + }, + "status": { + "type": "string", + "description": "Status of the transfer job (ENABLED, DISABLED)." + }, + "sourceBucket": { + "type": "string", + "description": "Source AWS S3 bucket." + }, + "sourcePath": { + "type": "string", + "description": "Prefix path inside the source bucket." + }, + "targetBucket": { + "type": "string", + "description": "Target GCS bucket." + }, + "targetPath": { + "type": "string", + "description": "Prefix path inside the target bucket." + }, + "objectConditions": { + "type": "object", + "description": "Filtering conditions for objects transferred." + }, + "transferOptions": { + "type": "object", + "description": "Transfer options such as overwrite or delete." + }, + "schedule": { + "type": "object", + "description": "Transfer schedule details." + } + } + } + ], + "type": "object" + } + }, + "type": "object", + "properties": { + "cloudStorageTransferJob": { + "$ref": "#/$defs/CloudStorageTransferJobFacet" + } + } +} diff --git a/providers/google/src/airflow/providers/google/cloud/openlineage/CloudStorageTransferRunFacet.json b/providers/google/src/airflow/providers/google/cloud/openlineage/CloudStorageTransferRunFacet.json new file mode 100644 index 0000000000000..8eea204b53aaf --- /dev/null +++ b/providers/google/src/airflow/providers/google/cloud/openlineage/CloudStorageTransferRunFacet.json @@ -0,0 +1,60 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$defs": { + "CloudStorageTransferRunFacet": { + "allOf": [ + { + "$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet" + }, + { + "type": "object", + "properties": { + "jobName": { + "type": "string", + "description": "Transfer job name associated with this run." + }, + "operationName": { + "type": "string", + "description": "Transfer operation name if available." + }, + "status": { + "type": "string", + "description": "Run status if available." + }, + "startTime": { + "type": "string", + "description": "Start time of the transfer operation." + }, + "endTime": { + "type": "string", + "description": "End time of the transfer operation." + }, + "wait": { + "type": "boolean", + "description": "Whether the operator waited for completion." + }, + "timeout": { + "type": ["number", "null"], + "description": "Timeout in seconds." + }, + "deferrable": { + "type": "boolean", + "description": "Whether the operator used deferrable mode." + }, + "deleteJobAfterCompletion": { + "type": "boolean", + "description": "Whether the transfer job was deleted after completion." + } + } + } + ], + "type": "object" + } + }, + "type": "object", + "properties": { + "cloudStorageTransferRun": { + "$ref": "#/$defs/CloudStorageTransferRunFacet" + } + } +} diff --git a/providers/google/src/airflow/providers/google/cloud/openlineage/facets.py b/providers/google/src/airflow/providers/google/cloud/openlineage/facets.py index 46177bb4de98b..645e8c51b07bc 100644 --- a/providers/google/src/airflow/providers/google/cloud/openlineage/facets.py +++ b/providers/google/src/airflow/providers/google/cloud/openlineage/facets.py @@ -24,13 +24,17 @@ from airflow.providers.google import __version__ as provider_version if TYPE_CHECKING: - from openlineage.client.generated.base import RunFacet + from openlineage.client.generated.base import JobFacet, RunFacet try: try: from openlineage.client.generated.base import RunFacet except ImportError: # Old OpenLineage client is used from openlineage.client.facet import BaseFacet as RunFacet # type: ignore[assignment] + try: + from openlineage.client.generated.base import JobFacet + except ImportError: # Old OpenLineage client is used + from openlineage.client.facet import BaseFacet as JobFacet # type: ignore[assignment] @define class BigQueryJobRunFacet(RunFacet): @@ -53,6 +57,80 @@ def _get_schema() -> str: f"providers-google/{provider_version}/airflow/providers/google/" "openlineage/BigQueryJobRunFacet.json" ) + + @define + class CloudStorageTransferJobFacet(JobFacet): + """ + Facet representing a Cloud Storage Transfer Service job configuration. + + :param jobName: Unique name of the transfer job. + :param projectId: GCP project where the transfer job is defined. + :param description: User-provided description of the transfer job. + :param status: Current status of the transfer job (e.g. "ENABLED", "DISABLED"). + :param sourceBucket: Name of the source bucket (e.g. AWS S3). + :param sourcePath: Prefix/path inside the source bucket. + :param targetBucket: Name of the destination bucket (e.g. GCS). + :param targetPath: Prefix/path inside the destination bucket. + :param objectConditions: Object selection rules (e.g. include/exclude prefixes). + :param transferOptions: Transfer options, such as overwrite behavior or whether to delete objects + from the source after transfer. + :param schedule: Schedule for the transfer job (if recurring). + """ + + jobName: str | None = field(default=None) + projectId: str | None = field(default=None) + description: str | None = field(default=None) + status: str | None = field(default=None) + sourceBucket: str | None = field(default=None) + sourcePath: str | None = field(default=None) + targetBucket: str | None = field(default=None) + targetPath: str | None = field(default=None) + objectConditions: dict | None = field(default=None) + transferOptions: dict | None = field(default=None) + schedule: dict | None = field(default=None) + + @staticmethod + def _get_schema() -> str: + return ( + "https://raw.githubusercontent.com/apache/airflow/" + f"providers-google/{provider_version}/airflow/providers/google/" + "openlineage/CloudStorageTransferJobFacet.json" + ) + + @define + class CloudStorageTransferRunFacet(RunFacet): + """ + Facet representing a Cloud Storage Transfer Service job execution run. + + :param jobName: Name of the transfer job being executed. + :param operationName: Name of the specific transfer operation instance. + :param status: Current status of the operation (e.g. "IN_PROGRESS", "SUCCESS", "FAILED"). + :param startTime: Time when the transfer job execution started (ISO 8601 format). + :param endTime: Time when the transfer job execution finished (ISO 8601 format). + :param wait: Whether the operator waits for the job to complete before finishing. + :param timeout: Timeout (in seconds) for the transfer run to complete. + :param deferrable: Whether the operator defers execution until job completion. + :param deleteJobAfterCompletion: Whether the operator deletes the transfer job after the run completes. + """ + + jobName: str | None = field(default=None) + operationName: str | None = field(default=None) + status: str | None = field(default=None) + startTime: str | None = field(default=None) + endTime: str | None = field(default=None) + wait: bool = field(default=True) + timeout: float | None = field(default=None) + deferrable: bool = field(default=False) + deleteJobAfterCompletion: bool = field(default=False) + + @staticmethod + def _get_schema() -> str: + return ( + "https://raw.githubusercontent.com/apache/airflow/" + f"providers-google/{provider_version}/airflow/providers/google/" + "openlineage/CloudStorageTransferRunFacet.json" + ) + except ImportError: # OpenLineage is not available def create_no_op(*_, **__) -> None: @@ -65,3 +143,5 @@ def create_no_op(*_, **__) -> None: return None BigQueryJobRunFacet = create_no_op # type: ignore[misc, assignment] + CloudStorageTransferJobFacet = create_no_op # type: ignore[misc, assignment] + CloudStorageTransferRunFacet = create_no_op # type: ignore[misc, assignment] diff --git a/providers/google/src/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py b/providers/google/src/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py index 788a5822a71be..88307119aba9b 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py @@ -71,6 +71,7 @@ from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: + from airflow.providers.openlineage.extractors import OperatorLineage from airflow.utils.context import Context @@ -964,6 +965,7 @@ def __init__( self.aws_role_arn = aws_role_arn self.deferrable = deferrable self._validate_inputs() + self._transfer_job: dict[str, Any] | None = None def _validate_inputs(self) -> None: if self.delete_job_after_completion and not self.wait: @@ -978,19 +980,18 @@ def execute(self, context: Context) -> None: TransferJobPreprocessor(body=body, aws_conn_id=self.aws_conn_id, default_schedule=True).process_body() - job = hook.create_transfer_job(body=body) - + self._transfer_job = hook.create_transfer_job(body=body) if self.wait: if not self.deferrable: - hook.wait_for_transfer_job(job, timeout=self.timeout) + hook.wait_for_transfer_job(self._transfer_job, timeout=self.timeout) if self.delete_job_after_completion: - hook.delete_transfer_job(job_name=job[NAME], project_id=self.project_id) + hook.delete_transfer_job(job_name=self._transfer_job[NAME], project_id=self.project_id) else: self.defer( timeout=timedelta(seconds=self.timeout or 60), trigger=CloudStorageTransferServiceCheckJobStatusTrigger( - job_name=job[NAME], - project_id=job[PROJECT_ID], + job_name=self._transfer_job[NAME], + project_id=self._transfer_job[PROJECT_ID], gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.google_impersonation_chain, ), @@ -1040,6 +1041,57 @@ def _create_body(self) -> dict: return body + def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage | None: + """Provide OpenLineage OperatorLineage for the S3->GCS transfer.""" + from airflow.providers.common.compat.openlineage.facet import Dataset + from airflow.providers.google.cloud.openlineage.facets import ( + CloudStorageTransferJobFacet, + CloudStorageTransferRunFacet, + ) + from airflow.providers.openlineage.extractors import OperatorLineage + + input_ds = Dataset( + namespace=f"s3://{self.s3_bucket}", + name=normalize_directory_path(self.s3_path) or "", + ) + + output_ds = Dataset( + namespace=f"gs://{self.gcs_bucket}", + name=normalize_directory_path(self.gcs_path) or "", + ) + + job = self._transfer_job or {} + job_facet = CloudStorageTransferJobFacet( + jobName=job.get(NAME), + projectId=job.get(PROJECT_ID, self.project_id), + description=job.get(DESCRIPTION, self.description), + status=job.get(STATUS), + sourceBucket=job.get(TRANSFER_SPEC, {}) + .get(AWS_S3_DATA_SOURCE, {}) + .get(BUCKET_NAME, self.s3_bucket), + sourcePath=job.get(TRANSFER_SPEC, {}).get(AWS_S3_DATA_SOURCE, {}).get(PATH, self.s3_path), + targetBucket=job.get(TRANSFER_SPEC, {}).get(GCS_DATA_SINK, {}).get(BUCKET_NAME, self.gcs_bucket), + targetPath=job.get(TRANSFER_SPEC, {}).get(GCS_DATA_SINK, {}).get(PATH, self.gcs_path), + objectConditions=job.get(TRANSFER_SPEC, {}).get("objectConditions", self.object_conditions), + transferOptions=job.get(TRANSFER_SPEC, {}).get("transferOptions", self.transfer_options), + schedule=job.get(SCHEDULE, self.schedule), + ) + + run_facet = CloudStorageTransferRunFacet( + jobName=job.get(NAME), + wait=self.wait, + timeout=self.timeout, + deferrable=self.deferrable, + deleteJobAfterCompletion=self.delete_job_after_completion, + ) + + return OperatorLineage( + inputs=[input_ds], + outputs=[output_ds], + job_facets={"cloudStorageTransferJob": job_facet}, + run_facets={"cloudStorageTransferRun": run_facet}, + ) + class CloudDataTransferServiceGCSToGCSOperator(GoogleCloudBaseOperator): """ diff --git a/providers/google/tests/unit/google/cloud/openlineage/test_facets.py b/providers/google/tests/unit/google/cloud/openlineage/test_facets.py index 4b45a9ee4b739..0aa1e4feebcb1 100644 --- a/providers/google/tests/unit/google/cloud/openlineage/test_facets.py +++ b/providers/google/tests/unit/google/cloud/openlineage/test_facets.py @@ -16,7 +16,11 @@ # under the License. from __future__ import annotations -from airflow.providers.google.cloud.openlineage.facets import BigQueryJobRunFacet +from airflow.providers.google.cloud.openlineage.facets import ( + BigQueryJobRunFacet, + CloudStorageTransferJobFacet, + CloudStorageTransferRunFacet, +) def test_bigquery_job_run_facet(): @@ -24,3 +28,55 @@ def test_bigquery_job_run_facet(): assert facet.cached is True assert facet.billedBytes == 123 assert facet.properties == "some_properties" + + +def test_cloud_storage_transfer_job_facet(): + facet = CloudStorageTransferJobFacet( + jobName="transferJobs/123", + projectId="test-project", + description="S3 to GCS transfer", + status="ENABLED", + sourceBucket="my-s3-bucket", + sourcePath="data/", + targetBucket="my-gcs-bucket", + targetPath="backup/", + objectConditions={"maxTimeElapsedSinceLastModification": "86400s"}, + transferOptions={"overwriteObjectsAlreadyExistingInSink": True}, + schedule={"scheduleStartDate": {"year": 2025, "month": 9, "day": 17}}, + ) + + assert facet.jobName == "transferJobs/123" + assert facet.projectId == "test-project" + assert facet.description == "S3 to GCS transfer" + assert facet.status == "ENABLED" + assert facet.sourceBucket == "my-s3-bucket" + assert facet.sourcePath == "data/" + assert facet.targetBucket == "my-gcs-bucket" + assert facet.targetPath == "backup/" + assert facet.objectConditions == {"maxTimeElapsedSinceLastModification": "86400s"} + assert facet.transferOptions == {"overwriteObjectsAlreadyExistingInSink": True} + assert facet.schedule == {"scheduleStartDate": {"year": 2025, "month": 9, "day": 17}} + + +def test_cloud_storage_transfer_run_facet(): + facet = CloudStorageTransferRunFacet( + jobName="transferJobs/123", + operationName="transferOperations/abc", + status="SUCCESS", + startTime="2025-09-17T10:00:00Z", + endTime="2025-09-17T10:05:00Z", + wait=True, + timeout=3600, + deferrable=False, + deleteJobAfterCompletion=True, + ) + + assert facet.jobName == "transferJobs/123" + assert facet.operationName == "transferOperations/abc" + assert facet.status == "SUCCESS" + assert facet.startTime == "2025-09-17T10:00:00Z" + assert facet.endTime == "2025-09-17T10:05:00Z" + assert facet.wait is True + assert facet.timeout == 3600 + assert facet.deferrable is False + assert facet.deleteJobAfterCompletion is True diff --git a/providers/google/tests/unit/google/cloud/operators/test_cloud_storage_transfer_service.py b/providers/google/tests/unit/google/cloud/operators/test_cloud_storage_transfer_service.py index e3f2201c8787a..60257593333ed 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_cloud_storage_transfer_service.py +++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_storage_transfer_service.py @@ -39,6 +39,7 @@ LIST_URL, NAME, PATH, + PROJECT_ID, SCHEDULE, SCHEDULE_END_DATE, SCHEDULE_START_DATE, @@ -47,6 +48,10 @@ STATUS, TRANSFER_SPEC, ) +from airflow.providers.google.cloud.openlineage.facets import ( + CloudStorageTransferJobFacet, + CloudStorageTransferRunFacet, +) from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import ( CloudDataTransferServiceCancelOperationOperator, CloudDataTransferServiceCreateJobOperator, @@ -1018,6 +1023,160 @@ def test_async_execute_error(self, mock_aws_hook): context={}, event={"status": "error", "message": "test failure message"} ) + @pytest.mark.parametrize( + "wait, job_name", + [ + (True, "transferJobs/123"), + (False, "transferJobs/456"), + ], + ) + def test_get_openlineage_facets_on_complete_facets_run_and_job(self, wait, job_name): + op = CloudDataTransferServiceS3ToGCSOperator( + task_id=TASK_ID, + s3_bucket=AWS_BUCKET_NAME, + s3_path="raw/", + gcs_bucket=GCS_BUCKET_NAME, + gcs_path="processed/", + project_id=GCP_PROJECT_ID, + wait=wait, + description=DESCRIPTION, + ) + op._transfer_job = { + NAME: job_name, + PROJECT_ID: GCP_PROJECT_ID, + DESCRIPTION: DESCRIPTION, + STATUS: "ENABLED", + TRANSFER_SPEC: { + AWS_S3_DATA_SOURCE: {BUCKET_NAME: AWS_BUCKET_NAME, PATH: "raw/"}, + GCS_DATA_SINK: {BUCKET_NAME: GCS_BUCKET_NAME, PATH: "processed/"}, + }, + } + + result = op.get_openlineage_facets_on_complete(task_instance=mock.Mock()) + + assert result.inputs[0].namespace == f"s3://{AWS_BUCKET_NAME}" + assert result.inputs[0].name == "raw/" + assert result.outputs[0].namespace == f"gs://{GCS_BUCKET_NAME}" + assert result.outputs[0].name == "processed/" + + job_facet = result.job_facets["cloudStorageTransferJob"] + assert isinstance(job_facet, CloudStorageTransferJobFacet) + assert job_facet.jobName == job_name + assert job_facet.projectId == GCP_PROJECT_ID + assert job_facet.description == DESCRIPTION + assert job_facet.status == "ENABLED" + assert job_facet.sourceBucket == AWS_BUCKET_NAME + assert job_facet.sourcePath == "raw/" + assert job_facet.targetBucket == GCS_BUCKET_NAME + assert job_facet.targetPath == "processed/" + assert job_facet.objectConditions is None + assert job_facet.transferOptions is None + assert job_facet.schedule is None + + run_facet = result.run_facets["cloudStorageTransferRun"] + assert isinstance(run_facet, CloudStorageTransferRunFacet) + assert run_facet.jobName == job_name + assert run_facet.wait == wait + + @pytest.mark.parametrize( + "object_conditions, delete_source", + [ + ({"includePrefixes": ["2025/"]}, True), + (None, False), + ], + ) + def test_get_openlineage_facets_on_complete_job_facet_includes_object_conditions_and_options( + self, object_conditions, delete_source + ): + op = CloudDataTransferServiceS3ToGCSOperator( + task_id=TASK_ID, + s3_bucket=AWS_BUCKET_NAME, + gcs_bucket=GCS_BUCKET_NAME, + project_id=GCP_PROJECT_ID, + object_conditions=object_conditions, + transfer_options={"deleteObjectsFromSourceAfterTransfer": delete_source}, + wait=True, + ) + op._transfer_job = { + NAME: "transferJobs/789", + PROJECT_ID: GCP_PROJECT_ID, + TRANSFER_SPEC: { + AWS_S3_DATA_SOURCE: {BUCKET_NAME: AWS_BUCKET_NAME}, + GCS_DATA_SINK: {BUCKET_NAME: GCS_BUCKET_NAME}, + "objectConditions": object_conditions, + "transferOptions": {"deleteObjectsFromSourceAfterTransfer": delete_source}, + }, + } + + result = op.get_openlineage_facets_on_complete(task_instance=mock.Mock()) + job_facet = result.job_facets["cloudStorageTransferJob"] + assert job_facet.projectId == GCP_PROJECT_ID + assert job_facet.objectConditions == object_conditions + assert job_facet.transferOptions == {"deleteObjectsFromSourceAfterTransfer": delete_source} + assert isinstance(job_facet.objectConditions, (dict, type(None))) + assert isinstance(job_facet.transferOptions, (dict, type(None))) + + def test_get_openlineage_facets_on_complete_job_facet_without_object_conditions_or_transfer_options(self): + op = CloudDataTransferServiceS3ToGCSOperator( + task_id=TASK_ID, + s3_bucket=AWS_BUCKET_NAME, + gcs_bucket=GCS_BUCKET_NAME, + wait=True, + ) + op._transfer_job = { + NAME: "transferJobs/222", + PROJECT_ID: GCP_PROJECT_ID, + "transferSpec": { + AWS_S3_DATA_SOURCE: {BUCKET_NAME: AWS_BUCKET_NAME}, + GCS_DATA_SINK: {BUCKET_NAME: GCS_BUCKET_NAME}, + }, + } + + result = op.get_openlineage_facets_on_complete(task_instance=mock.Mock()) + job_facet = result.job_facets["cloudStorageTransferJob"] + assert job_facet.objectConditions is None + assert job_facet.transferOptions is None + assert job_facet.schedule is None + + def test_get_openlineage_facets_on_complete_delete_job_after_completion_still_produces_facets(self): + op = CloudDataTransferServiceS3ToGCSOperator( + task_id=TASK_ID, + s3_bucket=AWS_BUCKET_NAME, + gcs_bucket=GCS_BUCKET_NAME, + project_id=GCP_PROJECT_ID, + delete_job_after_completion=True, + wait=True, + ) + op._transfer_job = {NAME: "transferJobs/333", PROJECT_ID: GCP_PROJECT_ID} + + result = op.get_openlineage_facets_on_complete(task_instance=mock.Mock()) + + assert "cloudStorageTransferJob" in result.job_facets + assert "cloudStorageTransferRun" in result.run_facets + run_facet = result.run_facets["cloudStorageTransferRun"] + assert run_facet.deleteJobAfterCompletion is True + + def test_get_openlineage_facets_on_complete_inputs_outputs_when_paths_missing(self): + op = CloudDataTransferServiceS3ToGCSOperator( + task_id=TASK_ID, + s3_bucket=AWS_BUCKET_NAME, + gcs_bucket=GCS_BUCKET_NAME, + ) + op._transfer_job = { + NAME: "transferJobs/444", + PROJECT_ID: GCP_PROJECT_ID, + "transferSpec": { + AWS_S3_DATA_SOURCE: {BUCKET_NAME: AWS_BUCKET_NAME}, + GCS_DATA_SINK: {BUCKET_NAME: GCS_BUCKET_NAME}, + }, + } + + result = op.get_openlineage_facets_on_complete(task_instance=mock.Mock()) + assert result.inputs[0].namespace == f"s3://{AWS_BUCKET_NAME}" + assert result.inputs[0].name == "" + assert result.outputs[0].namespace == f"gs://{GCS_BUCKET_NAME}" + assert result.outputs[0].name == "" + class TestGoogleCloudStorageToGoogleCloudStorageTransferOperator: def test_constructor(self):