Skip to content

Commit

Permalink
feat(ingest/bigquery): Add way to reference existing DataHub Tag from…
Browse files Browse the repository at this point in the history
… a bigquery label (datahub-project#11544)
  • Loading branch information
treff7es authored and sleeperdeep committed Dec 17, 2024
1 parent ae3963e commit ee7a6cf
Show file tree
Hide file tree
Showing 6 changed files with 1,135 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import re
from base64 import b32decode
from collections import defaultdict
from typing import Dict, Iterable, List, Optional, Set, Type, Union, cast

Expand Down Expand Up @@ -89,12 +90,13 @@
HiveColumnToAvroConverter,
get_schema_fields_for_hive_column,
)
from datahub.utilities.mapping import Constants
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.ratelimiter import RateLimiter
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor

ENCODED_TAG_PREFIX = "urn_li_encoded_tag_"

logger: logging.Logger = logging.getLogger(__name__)
# Handle table snapshots
# See https://cloud.google.com/bigquery/docs/table-snapshots-intro.
Expand Down Expand Up @@ -194,6 +196,18 @@ def store_table_refs(self):
or self.config.use_queries_v2
)

def modified_base32decode(self, text_to_decode: str) -> str:
# When we sync from DataHub to BigQuery, we encode the tags as modified base32 strings.
# BiqQuery labels only support lowercase letters, international characters, numbers, or underscores.
# So we need to modify the base32 encoding to replace the padding character `=` with `_` and convert to lowercase.
if not text_to_decode.startswith("%s" % ENCODED_TAG_PREFIX):
return text_to_decode
text_to_decode = (
text_to_decode.replace(ENCODED_TAG_PREFIX, "").upper().replace("_", "=")
)
text = b32decode(text_to_decode.encode("utf-8")).decode("utf-8")
return text

def get_project_workunits(
self, project: BigqueryProject
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -253,7 +267,7 @@ def gen_dataset_containers(
tags_joined: Optional[List[str]] = None
if tags and self.config.capture_dataset_label_as_tag:
tags_joined = [
f"{k}:{v}"
self.make_tag_from_label(k, v)
for k, v in tags.items()
if is_tag_allowed(self.config.capture_dataset_label_as_tag, k)
]
Expand Down Expand Up @@ -662,6 +676,11 @@ def _process_snapshot(
dataset_name=dataset_name,
)

def make_tag_from_label(self, key: str, value: str) -> str:
if not value.startswith(ENCODED_TAG_PREFIX):
return make_tag_urn(f"""{key}:{value}""")
return self.modified_base32decode(value)

def gen_table_dataset_workunits(
self,
table: BigqueryTable,
Expand Down Expand Up @@ -707,7 +726,7 @@ def gen_table_dataset_workunits(
tags_to_add = []
tags_to_add.extend(
[
make_tag_urn(f"""{k}:{v}""")
self.make_tag_from_label(k, v)
for k, v in table.labels.items()
if is_tag_allowed(self.config.capture_table_label_as_tag, k)
]
Expand All @@ -733,7 +752,7 @@ def gen_view_dataset_workunits(
tags_to_add = None
if table.labels and self.config.capture_view_label_as_tag:
tags_to_add = [
make_tag_urn(f"{k}:{v}")
self.make_tag_from_label(k, v)
for k, v in table.labels.items()
if is_tag_allowed(self.config.capture_view_label_as_tag, k)
]
Expand Down Expand Up @@ -922,11 +941,6 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
break
else:
tags = []
if col.is_partition_column:
tags.append(
TagAssociationClass(make_tag_urn(Constants.TAG_PARTITION_KEY))
)

if col.cluster_column_position is not None:
tags.append(
TagAssociationClass(
Expand All @@ -944,6 +958,7 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
type=SchemaFieldDataType(
self.BIGQUERY_FIELD_TYPE_MAPPINGS.get(col.data_type, NullType)()
),
isPartitioningKey=col.is_partition_column,
nativeDataType=col.data_type,
description=col.comment,
nullable=col.is_nullable,
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ def __init__(self, ctx: PipelineContext, config: MongoDBConfig):

# See https://pymongo.readthedocs.io/en/stable/examples/datetimes.html#handling-out-of-range-datetimes
self.mongo_client = MongoClient(
self.config.connect_uri, datetime_conversion="DATETIME_AUTO", **options
) # type: ignore
self.config.connect_uri, datetime_conversion="DATETIME_AUTO", **options # type: ignore
)

# This cheaply tests the connection. For details, see
# https://pymongo.readthedocs.io/en/stable/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@
"actor": "urn:li:corpuser:datahub"
}
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
},
{
"fieldPath": "email",
Expand All @@ -296,7 +297,8 @@
"actor": "urn:li:corpuser:datahub"
}
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
}
]
}
Expand Down Expand Up @@ -328,6 +330,29 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": [
{
"tag": "urn:li:tag:priority:high"
},
{
"tag": "urn:li:tag:purchase"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
Expand Down Expand Up @@ -463,7 +488,8 @@
}
]
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
},
{
"fieldPath": "email",
Expand All @@ -479,7 +505,8 @@
"globalTags": {
"tags": []
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
}
]
}
Expand Down Expand Up @@ -620,7 +647,8 @@
}
]
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
},
{
"fieldPath": "email",
Expand All @@ -636,7 +664,8 @@
"globalTags": {
"tags": []
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
}
]
}
Expand Down Expand Up @@ -1021,5 +1050,37 @@
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:priority:high",
"changeType": "UPSERT",
"aspectName": "tagKey",
"aspect": {
"json": {
"name": "priority:high"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:purchase",
"changeType": "UPSERT",
"aspectName": "tagKey",
"aspect": {
"json": {
"name": "purchase"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
}
]
Loading

0 comments on commit ee7a6cf

Please sign in to comment.