Skip to content

Commit

Permalink
fix(ingestion/bigquery-gcs-lineage): Add lineage extraction for BigQu…
Browse files Browse the repository at this point in the history
…ery with GCS source
  • Loading branch information
sagar-salvi-apptware committed Sep 20, 2024
1 parent 6af5c61 commit b773409
Show file tree
Hide file tree
Showing 8 changed files with 460 additions and 2 deletions.
17 changes: 17 additions & 0 deletions metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ source:
# deny:
# - "*.*.*"
#storage_project_id: project-id-1234567
## Lineage with GCS Source
# include_column_lineage_with_gcs: true/false
# gcs_lineage_config:
# path_specs:
# - include: "gs://my-bucket/foo/tests/bar.avro"
# - include: "gs://my-bucket/foo/tests/*.*"
# - include: "gs://my-bucket/foo/tests/{table}/*.avro"
# - include: "gs://my-bucket/foo/tests/{table}/*/*.avro"
# - include: "gs://my-bucket/foo/tests/{table}/*.*"
# - include: "gs://my-bucket/{dept}/tests/{table}/*.avro"
# - include: "gs://my-bucket/{dept}/tests/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.avro"
# - include: "gs://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.avro"
# - include: "gs://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*"
# - include: "gs://my-bucket/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*"
# - include: "gs://my-bucket/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*"
# strip_urls: false


## see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,16 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.bq_schema_extractor.table_refs,
)

# Lineage BQ to GCS
if (
self.config.include_table_lineage
and self.bq_schema_extractor.external_tables
):
for dataset_urn, table in self.bq_schema_extractor.external_tables.items():
yield from self.lineage_extractor.gen_lineage_workunits_for_external_table(
dataset_urn, table.ddl, graph=self.ctx.graph
)

def get_report(self) -> BigQueryV2Report:
return self.report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from datahub.ingestion.glossary.classification_mixin import (
ClassificationSourceConfigMixin,
)
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulLineageConfigMixin,
Expand Down Expand Up @@ -206,6 +207,39 @@ def get_sql_alchemy_url(self) -> str:
return "bigquery://"


class GcsLineageProviderConfig(ConfigModel):
"""
Any source that produces gcs lineage from/to Datasets should inherit this class.
"""

path_specs: List[PathSpec] = Field(
default=[],
description="List of PathSpec. See below the details about PathSpec",
)

strip_urls: bool = Field(
default=True,
description="Strip filename from gcs url. It only applies if path_specs are not specified.",
)

ignore_non_path_spec_path: bool = Field(
default=False,
description="Ignore paths that are not match in path_specs. It only applies if path_specs are specified.",
)


class GcsDatasetLineageProviderConfigBase(ConfigModel):
"""
Any source that produces gcs lineage from/to Datasets should inherit this class.
This is needeed to group all lineage related configs under `gcs_lineage_config` config property.
"""

gcs_lineage_config: GcsLineageProviderConfig = Field(
default=GcsLineageProviderConfig(),
description="Common config for gcs lineage generation",
)


class BigQueryFilterConfig(SQLFilterConfig):
project_ids: List[str] = Field(
default_factory=list,
Expand Down Expand Up @@ -328,6 +362,7 @@ class BigQueryIdentifierConfig(


class BigQueryV2Config(
GcsDatasetLineageProviderConfigBase,
BigQueryConnectionConfig,
BigQueryBaseConfig,
BigQueryFilterConfig,
Expand Down Expand Up @@ -468,6 +503,11 @@ def have_table_data_read_permission(self) -> bool:
description="Option to enable/disable lineage generation. Is enabled by default.",
)

include_column_lineage_with_gcs: bool = Field(
default=True,
description="When enabled, column-level lineage will be extracted from the gcs.",
)

max_query_duration: timedelta = Field(
default=timedelta(minutes=15),
description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class BigQueryV2Report(
num_filtered_query_events: int = 0
num_usage_query_hash_collisions: int = 0
num_operational_stats_workunits_emitted: int = 0
num_lineage_dropped_gcs_path: int = 0

snapshots_scanned: int = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class BigqueryTable(BaseTable):
long_term_billable_bytes: Optional[int] = None
partition_info: Optional[PartitionInfo] = None
columns_ignore_from_profiling: List[str] = field(default_factory=list)
table_type: Optional[str] = None


@dataclass
Expand Down Expand Up @@ -352,6 +353,7 @@ def _make_bigquery_table(
return BigqueryTable(
name=table.table_name,
created=table.created,
table_type=table.table_type,
last_altered=(
datetime.fromtimestamp(
table.get("last_altered") / 1000, tz=timezone.utc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ def __init__(
self.view_definitions: FileBackedDict[str] = FileBackedDict()
# Maps snapshot ref -> Snapshot
self.snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot] = FileBackedDict()
# Add External BQ table
self.external_tables: Dict[str, BigqueryTable] = defaultdict()

@property
def store_table_refs(self):
Expand Down Expand Up @@ -785,6 +787,17 @@ def gen_dataset_workunits(
project_id, dataset_name, table.name
)

# Added for bigquery to gcs lineage extraction
if (
isinstance(table, BigqueryTable)
and table.table_type is not None
and table.table_type == "EXTERNAL"
and table.ddl is not None
and f"CREATE EXTERNAL TABLE `{project_id}.{dataset_name}.{table.name}`"
in table.ddl
):
self.external_tables[dataset_urn] = table

status = Status(removed=False)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=status
Expand Down
184 changes: 184 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import collections
import itertools
import json
import logging
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import (
Expand All @@ -15,17 +17,21 @@
Tuple,
Union,
)
from urllib.parse import urlparse

import humanfriendly
import sqlglot
from google.cloud.datacatalog import lineage_v1
from google.cloud.logging_v2.client import Client as GCPLoggingClient

from datahub.api.entities.dataset.dataset import Dataset
from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import auto_workunit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
AuditLogEntry,
BigQueryAuditMetadata,
Expand All @@ -51,10 +57,13 @@
BQ_FILTER_RULE_TEMPLATE_V2_LINEAGE,
bigquery_audit_metadata_query_template_lineage,
)
from datahub.ingestion.source.gcs import gcs_utils
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantLineageRunSkipHandler,
)
from datahub.ingestion.source_report.ingestion_stage import LINEAGE_EXTRACTION
from datahub.metadata._schema_classes import SchemaMetadataClass
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetLineageTypeClass,
Expand Down Expand Up @@ -918,3 +927,178 @@ def test_capability(self, project_id: str) -> None:
def report_status(self, step: str, status: bool) -> None:
if self.redundant_run_skip_handler:
self.redundant_run_skip_handler.report_current_run_status(step, status)

def gen_lineage_workunits_for_external_table(
self,
dataset_urn: str,
ddl: Optional[str],
graph: Optional[DataHubGraph] = None,
) -> Iterable[MetadataWorkUnit]:

if not ddl:
return

# Expect URIs in `uris=[""]` format
uris_match = re.search(r"uris=\[([^\]]+)\]", ddl)
if not uris_match:
return

uris_str = uris_match.group(1)
source_uris = json.loads(f"[{uris_str}]")

lineage_info = self.get_lineage_for_external_table(
dataset_urn=dataset_urn,
source_uris=source_uris,
graph=graph,
)

if lineage_info:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=lineage_info
).as_workunit()

def get_lineage_for_external_table(
self,
dataset_urn: str,
source_uris: List[str],
graph: Optional[DataHubGraph] = None,
) -> Optional[UpstreamLineageClass]:

upstreams_list: List[UpstreamClass] = []
fine_grained_lineages: List[FineGrainedLineageClass] = []
gcs_urns: Set[str] = set()

for source_uri in source_uris:
# Check that storage_location have the gs:// prefix.
# Right now we are only supporting GCS lineage
if not gcs_utils.is_gcs_uri(source_uri):
continue
gcs_path = self._get_gcs_path(source_uri)

if gcs_path is None:
continue

path = gcs_utils.strip_gcs_prefix(gcs_path)
urn = make_dataset_urn_with_platform_instance(
platform="gcs",
name=path,
env=self.config.env,
platform_instance=(
self.config.platform_instance
if self.config.platform_instance is not None
else None
),
)
gcs_urns.add(urn)

upstreams_list.extend(
[
UpstreamClass(
dataset=source_dataset_urn,
type=DatasetLineageTypeClass.COPY,
)
for source_dataset_urn in gcs_urns
]
)

if not upstreams_list:
return None

if self.config.include_column_lineage_with_gcs:
assert graph
schema_metadata: Optional[SchemaMetadataClass] = graph.get_schema_metadata(
dataset_urn
)
for gcs_dataset_urn in gcs_urns:
assert graph
schema_metadata_for_gcs: Optional[
SchemaMetadataClass
] = graph.get_schema_metadata(gcs_dataset_urn)
if schema_metadata and schema_metadata_for_gcs:
fine_grained_lineage = self.get_fine_grained_lineages_with_gcs(
dataset_urn,
gcs_dataset_urn,
schema_metadata,
schema_metadata_for_gcs,
)
if not fine_grained_lineage:
continue

fine_grained_lineages.extend(fine_grained_lineage)

upstream_lineage = UpstreamLineageClass(
upstreams=upstreams_list, fineGrainedLineages=fine_grained_lineages or None
)
return upstream_lineage

def _get_gcs_path(self, path: str) -> Optional[str]:
if self.config.gcs_lineage_config:
for path_spec in self.config.gcs_lineage_config.path_specs:
if not path_spec.allowed(path):
logger.debug(
f"Skipping gcs path {path} as it does not match any path spec."
)
self.report.num_lineage_dropped_gcs_path += 1
continue

_, table_path = path_spec.extract_table_name_and_path(path)
return table_path

if (
self.config.gcs_lineage_config.ignore_non_path_spec_path
and len(self.config.gcs_lineage_config.path_specs) > 0
):
self.report.num_lineage_dropped_gcs_path += 1
logger.debug(
f"Skipping gcs path {path} as it does not match any path spec."
)
return None

if self.config.gcs_lineage_config.strip_urls:
if "/" in urlparse(path).path:
return str(path.rsplit("/", 1)[0])

return path

def get_fine_grained_lineages_with_gcs(
self,
dataset_urn: str,
gcs_dataset_urn: str,
schema_metadata: SchemaMetadata,
schema_metadata_for_gcs: SchemaMetadata,
) -> Optional[List[FineGrainedLineageClass]]:
def simplify_field_path(field_path):
return Dataset._simplify_field_path(field_path)

if schema_metadata and schema_metadata_for_gcs:
fine_grained_lineages: List[FineGrainedLineageClass] = []
for field in schema_metadata.fields:
field_path_v1 = simplify_field_path(field.fieldPath)
matching_gcs_field = next(
(
f
for f in schema_metadata_for_gcs.fields
if simplify_field_path(f.fieldPath) == field_path_v1
),
None,
)
if matching_gcs_field:
fine_grained_lineages.append(
FineGrainedLineageClass(
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=[
mce_builder.make_schema_field_urn(
dataset_urn, field_path_v1
)
],
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
upstreams=[
mce_builder.make_schema_field_urn(
gcs_dataset_urn,
simplify_field_path(matching_gcs_field.fieldPath),
)
],
)
)
return fine_grained_lineages
return None
Loading

0 comments on commit b773409

Please sign in to comment.