Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$defs": {
"CloudStorageTransferJobFacet": {
"allOf": [
{
"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/JobFacet"
},
{
"type": "object",
"properties": {
"jobName": {
"type": "string",
"description": "Transfer job name assigned by GCP Storage Transfer Service."
},
"projectId": {
"type": "string",
"description": "GCP project ID."
},
"description": {
"type": "string",
"description": "Optional description of the transfer job."
},
"status": {
"type": "string",
"description": "Status of the transfer job (ENABLED, DISABLED)."
},
"sourceBucket": {
"type": "string",
"description": "Source AWS S3 bucket."
},
"sourcePath": {
"type": "string",
"description": "Prefix path inside the source bucket."
},
"targetBucket": {
"type": "string",
"description": "Target GCS bucket."
},
"targetPath": {
"type": "string",
"description": "Prefix path inside the target bucket."
},
"objectConditions": {
"type": "object",
"description": "Filtering conditions for objects transferred."
},
"transferOptions": {
"type": "object",
"description": "Transfer options such as overwrite or delete."
},
"schedule": {
"type": "object",
"description": "Transfer schedule details."
}
}
}
],
"type": "object"
}
},
"type": "object",
"properties": {
"cloudStorageTransferJob": {
"$ref": "#/$defs/CloudStorageTransferJobFacet"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$defs": {
"CloudStorageTransferRunFacet": {
"allOf": [
{
"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
},
{
"type": "object",
"properties": {
"jobName": {
"type": "string",
"description": "Transfer job name associated with this run."
},
"operationName": {
"type": "string",
"description": "Transfer operation name if available."
},
"status": {
"type": "string",
"description": "Run status if available."
},
"startTime": {
"type": "string",
"description": "Start time of the transfer operation."
},
"endTime": {
"type": "string",
"description": "End time of the transfer operation."
},
"wait": {
"type": "boolean",
"description": "Whether the operator waited for completion."
},
"timeout": {
"type": ["number", "null"],
"description": "Timeout in seconds."
},
"deferrable": {
"type": "boolean",
"description": "Whether the operator used deferrable mode."
},
"deleteJobAfterCompletion": {
"type": "boolean",
"description": "Whether the transfer job was deleted after completion."
}
}
}
],
"type": "object"
}
},
"type": "object",
"properties": {
"cloudStorageTransferRun": {
"$ref": "#/$defs/CloudStorageTransferRunFacet"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
from airflow.providers.google import __version__ as provider_version

if TYPE_CHECKING:
from openlineage.client.generated.base import RunFacet
from openlineage.client.generated.base import JobFacet, RunFacet

try:
try:
from openlineage.client.generated.base import RunFacet
except ImportError: # Old OpenLineage client is used
from openlineage.client.facet import BaseFacet as RunFacet # type: ignore[assignment]
try:
from openlineage.client.generated.base import JobFacet
except ImportError: # Old OpenLineage client is used
from openlineage.client.facet import BaseFacet as JobFacet # type: ignore[assignment]

@define
class BigQueryJobRunFacet(RunFacet):
Expand All @@ -53,6 +57,80 @@ def _get_schema() -> str:
f"providers-google/{provider_version}/airflow/providers/google/"
"openlineage/BigQueryJobRunFacet.json"
)

@define
class CloudStorageTransferJobFacet(JobFacet):
"""
Facet representing a Cloud Storage Transfer Service job configuration.

:param jobName: Unique name of the transfer job.
:param projectId: GCP project where the transfer job is defined.
:param description: User-provided description of the transfer job.
:param status: Current status of the transfer job (e.g. "ENABLED", "DISABLED").
:param sourceBucket: Name of the source bucket (e.g. AWS S3).
:param sourcePath: Prefix/path inside the source bucket.
:param targetBucket: Name of the destination bucket (e.g. GCS).
:param targetPath: Prefix/path inside the destination bucket.
:param objectConditions: Object selection rules (e.g. include/exclude prefixes).
:param transferOptions: Transfer options, such as overwrite behavior or whether to delete objects
from the source after transfer.
:param schedule: Schedule for the transfer job (if recurring).
"""

jobName: str | None = field(default=None)
projectId: str | None = field(default=None)
description: str | None = field(default=None)
status: str | None = field(default=None)
sourceBucket: str | None = field(default=None)
sourcePath: str | None = field(default=None)
targetBucket: str | None = field(default=None)
targetPath: str | None = field(default=None)
objectConditions: dict | None = field(default=None)
transferOptions: dict | None = field(default=None)
schedule: dict | None = field(default=None)

@staticmethod
def _get_schema() -> str:
return (
"https://raw.githubusercontent.com/apache/airflow/"
f"providers-google/{provider_version}/airflow/providers/google/"
"openlineage/CloudStorageTransferJobFacet.json"
)

@define
class CloudStorageTransferRunFacet(RunFacet):
"""
Facet representing a Cloud Storage Transfer Service job execution run.

:param jobName: Name of the transfer job being executed.
:param operationName: Name of the specific transfer operation instance.
:param status: Current status of the operation (e.g. "IN_PROGRESS", "SUCCESS", "FAILED").
:param startTime: Time when the transfer job execution started (ISO 8601 format).
:param endTime: Time when the transfer job execution finished (ISO 8601 format).
:param wait: Whether the operator waits for the job to complete before finishing.
:param timeout: Timeout (in seconds) for the transfer run to complete.
:param deferrable: Whether the operator defers execution until job completion.
:param deleteJobAfterCompletion: Whether the operator deletes the transfer job after the run completes.
"""

jobName: str | None = field(default=None)
operationName: str | None = field(default=None)
status: str | None = field(default=None)
startTime: str | None = field(default=None)
endTime: str | None = field(default=None)
wait: bool = field(default=True)
timeout: float | None = field(default=None)
deferrable: bool = field(default=False)
deleteJobAfterCompletion: bool = field(default=False)

@staticmethod
def _get_schema() -> str:
return (
"https://raw.githubusercontent.com/apache/airflow/"
f"providers-google/{provider_version}/airflow/providers/google/"
"openlineage/CloudStorageTransferRunFacet.json"
)

except ImportError: # OpenLineage is not available

def create_no_op(*_, **__) -> None:
Expand All @@ -65,3 +143,5 @@ def create_no_op(*_, **__) -> None:
return None

BigQueryJobRunFacet = create_no_op # type: ignore[misc, assignment]
CloudStorageTransferJobFacet = create_no_op # type: ignore[misc, assignment]
CloudStorageTransferRunFacet = create_no_op # type: ignore[misc, assignment]
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID

if TYPE_CHECKING:
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.context import Context


Expand Down Expand Up @@ -964,6 +965,7 @@ def __init__(
self.aws_role_arn = aws_role_arn
self.deferrable = deferrable
self._validate_inputs()
self._transfer_job: dict[str, Any] | None = None

def _validate_inputs(self) -> None:
if self.delete_job_after_completion and not self.wait:
Expand All @@ -978,19 +980,18 @@ def execute(self, context: Context) -> None:

TransferJobPreprocessor(body=body, aws_conn_id=self.aws_conn_id, default_schedule=True).process_body()

job = hook.create_transfer_job(body=body)

self._transfer_job = hook.create_transfer_job(body=body)
if self.wait:
if not self.deferrable:
hook.wait_for_transfer_job(job, timeout=self.timeout)
hook.wait_for_transfer_job(self._transfer_job, timeout=self.timeout)
if self.delete_job_after_completion:
hook.delete_transfer_job(job_name=job[NAME], project_id=self.project_id)
hook.delete_transfer_job(job_name=self._transfer_job[NAME], project_id=self.project_id)
else:
self.defer(
timeout=timedelta(seconds=self.timeout or 60),
trigger=CloudStorageTransferServiceCheckJobStatusTrigger(
job_name=job[NAME],
project_id=job[PROJECT_ID],
job_name=self._transfer_job[NAME],
project_id=self._transfer_job[PROJECT_ID],
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.google_impersonation_chain,
),
Expand Down Expand Up @@ -1040,6 +1041,57 @@ def _create_body(self) -> dict:

return body

def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage | None:
"""Provide OpenLineage OperatorLineage for the S3->GCS transfer."""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.google.cloud.openlineage.facets import (
CloudStorageTransferJobFacet,
CloudStorageTransferRunFacet,
)
from airflow.providers.openlineage.extractors import OperatorLineage

input_ds = Dataset(
namespace=f"s3://{self.s3_bucket}",
name=normalize_directory_path(self.s3_path) or "",
)

output_ds = Dataset(
namespace=f"gs://{self.gcs_bucket}",
name=normalize_directory_path(self.gcs_path) or "",
)

job = self._transfer_job or {}
job_facet = CloudStorageTransferJobFacet(
jobName=job.get(NAME),
projectId=job.get(PROJECT_ID, self.project_id),
description=job.get(DESCRIPTION, self.description),
status=job.get(STATUS),
sourceBucket=job.get(TRANSFER_SPEC, {})
.get(AWS_S3_DATA_SOURCE, {})
.get(BUCKET_NAME, self.s3_bucket),
sourcePath=job.get(TRANSFER_SPEC, {}).get(AWS_S3_DATA_SOURCE, {}).get(PATH, self.s3_path),
targetBucket=job.get(TRANSFER_SPEC, {}).get(GCS_DATA_SINK, {}).get(BUCKET_NAME, self.gcs_bucket),
targetPath=job.get(TRANSFER_SPEC, {}).get(GCS_DATA_SINK, {}).get(PATH, self.gcs_path),
objectConditions=job.get(TRANSFER_SPEC, {}).get("objectConditions", self.object_conditions),
transferOptions=job.get(TRANSFER_SPEC, {}).get("transferOptions", self.transfer_options),
schedule=job.get(SCHEDULE, self.schedule),
)

run_facet = CloudStorageTransferRunFacet(
jobName=job.get(NAME),
wait=self.wait,
timeout=self.timeout,
deferrable=self.deferrable,
deleteJobAfterCompletion=self.delete_job_after_completion,
)

return OperatorLineage(
inputs=[input_ds],
outputs=[output_ds],
job_facets={"cloudStorageTransferJob": job_facet},
run_facets={"cloudStorageTransferRun": run_facet},
)


class CloudDataTransferServiceGCSToGCSOperator(GoogleCloudBaseOperator):
"""
Expand Down
Loading
Loading