Skip to content

Commit

Permalink
feat: Add OpenLineage support for non-query jobs in BigQueryInsertJob…
Browse files Browse the repository at this point in the history
…Operator

Signed-off-by: Kacper Muda <mudakacper@gmail.com>
  • Loading branch information
kacpermuda committed Jan 2, 2025
1 parent 608de6b commit 5a0f236
Show file tree
Hide file tree
Showing 10 changed files with 762 additions and 143 deletions.
221 changes: 139 additions & 82 deletions providers/src/airflow/providers/google/cloud/openlineage/mixins.py

Large diffs are not rendered by default.

41 changes: 27 additions & 14 deletions providers/src/airflow/providers/google/cloud/openlineage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
import logging
import os
import pathlib
import re
from collections import defaultdict
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any

from attr import define, field
from google.cloud.dataproc_v1 import Batch, RuntimeConfig

from airflow.providers.common.compat.openlineage.facet import (
BaseFacet,
ColumnLineageDatasetFacet,
DatasetFacet,
DocumentationDatasetFacet,
Fields,
Identifier,
Expand Down Expand Up @@ -212,9 +214,9 @@ def extract_ds_name_from_gcs_path(path: str) -> str:
return path


def get_facets_from_bq_table(table: Table) -> dict[str, BaseFacet]:
def get_facets_from_bq_table(table: Table) -> dict[str, DatasetFacet]:
"""Get facets from BigQuery table object."""
facets: dict[str, BaseFacet] = {}
facets: dict[str, DatasetFacet] = {}
if table.schema:
facets["schema"] = SchemaDatasetFacet(
fields=[
Expand All @@ -228,26 +230,37 @@ def get_facets_from_bq_table(table: Table) -> dict[str, BaseFacet]:
facets["documentation"] = DocumentationDatasetFacet(description=table.description)

if table.external_data_configuration:
symlinks = set()
for uri in table.external_data_configuration.source_uris:
if uri.startswith("gs://"):
bucket, blob = _parse_gcs_url(uri)
blob = extract_ds_name_from_gcs_path(blob)
symlinks.add((f"gs://{bucket}", blob))

symlinks = get_namespace_name_from_source_uris(table.external_data_configuration.source_uris)
facets["symlink"] = SymlinksDatasetFacet(
identifiers=[
Identifier(namespace=namespace, name=name, type="file")
Identifier(
namespace=namespace, name=name, type="file" if namespace.startswith("gs://") else "table"
)
for namespace, name in sorted(symlinks)
]
)
return facets


def get_namespace_name_from_source_uris(source_uris: Iterable[str]) -> set[tuple[str, str]]:
result = set()
for uri in source_uris:
if uri.startswith("gs://"):
bucket, blob = _parse_gcs_url(uri)
result.add((f"gs://{bucket}", extract_ds_name_from_gcs_path(blob)))
elif uri.startswith("https://googleapis.com/bigtable"):
regex = r"https://googleapis.com/bigtable/projects/([^/]+)/instances/([^/]+)(?:/appProfiles/([^/]+))?/tables/([^/]+)"
match = re.match(regex, uri)
if match:
project_id, instance_id, table_id = match.groups()[0], match.groups()[1], match.groups()[3]
result.add((f"bigtable://{project_id}/{instance_id}", table_id))
return result


def get_identity_column_lineage_facet(
dest_field_names: list[str],
input_datasets: list[Dataset],
) -> dict[str, ColumnLineageDatasetFacet]:
dest_field_names: Iterable[str],
input_datasets: Iterable[Dataset],
) -> dict[str, DatasetFacet]:
"""
Get column lineage facet for identity transformations.
Expand Down
Loading

0 comments on commit 5a0f236

Please sign in to comment.