Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/bigquery): Add way to reference existing DataHub Tag from a bigquery label #11544

Merged
merged 6 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import re
from base64 import b32decode
from collections import defaultdict
from typing import Dict, Iterable, List, Optional, Set, Type, Union, cast

Expand Down Expand Up @@ -89,12 +90,13 @@
HiveColumnToAvroConverter,
get_schema_fields_for_hive_column,
)
from datahub.utilities.mapping import Constants
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.ratelimiter import RateLimiter
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor

ENCODED_TAG_PREFIX = "urn_li_encoded_tag_"

logger: logging.Logger = logging.getLogger(__name__)
# Handle table snapshots
# See https://cloud.google.com/bigquery/docs/table-snapshots-intro.
Expand Down Expand Up @@ -194,6 +196,18 @@ def store_table_refs(self):
or self.config.use_queries_v2
)

def modified_base32decode(self, text_to_decode: str) -> str:
# When we sync from DataHub to BigQuery, we encode the tags as modified base32 strings.
# BiqQuery labels only support lowercase letters, international characters, numbers, or underscores.
# So we need to modify the base32 encoding to replace the padding character `=` with `_` and convert to lowercase.
if not text_to_decode.startswith("%s" % ENCODED_TAG_PREFIX):
return text_to_decode
text_to_decode = (
text_to_decode.replace(ENCODED_TAG_PREFIX, "").upper().replace("_", "=")
)
text = b32decode(text_to_decode.encode("utf-8")).decode("utf-8")
return text

def get_project_workunits(
self, project: BigqueryProject
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -253,7 +267,7 @@ def gen_dataset_containers(
tags_joined: Optional[List[str]] = None
if tags and self.config.capture_dataset_label_as_tag:
tags_joined = [
f"{k}:{v}"
self.make_tag_from_label(k, v)
for k, v in tags.items()
if is_tag_allowed(self.config.capture_dataset_label_as_tag, k)
]
Expand Down Expand Up @@ -662,6 +676,11 @@ def _process_snapshot(
dataset_name=dataset_name,
)

def make_tag_from_label(self, key: str, value: str) -> str:
if not value.startswith(ENCODED_TAG_PREFIX):
return make_tag_urn(f"""{key}:{value}""")
return self.modified_base32decode(value)

def gen_table_dataset_workunits(
self,
table: BigqueryTable,
Expand Down Expand Up @@ -707,7 +726,7 @@ def gen_table_dataset_workunits(
tags_to_add = []
tags_to_add.extend(
[
make_tag_urn(f"""{k}:{v}""")
self.make_tag_from_label(k, v)
for k, v in table.labels.items()
if is_tag_allowed(self.config.capture_table_label_as_tag, k)
]
Expand All @@ -733,7 +752,7 @@ def gen_view_dataset_workunits(
tags_to_add = None
if table.labels and self.config.capture_view_label_as_tag:
tags_to_add = [
make_tag_urn(f"{k}:{v}")
self.make_tag_from_label(k, v)
for k, v in table.labels.items()
if is_tag_allowed(self.config.capture_view_label_as_tag, k)
]
Expand Down Expand Up @@ -922,11 +941,6 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
break
else:
tags = []
if col.is_partition_column:
tags.append(
TagAssociationClass(make_tag_urn(Constants.TAG_PARTITION_KEY))
)

if col.cluster_column_position is not None:
tags.append(
TagAssociationClass(
Expand All @@ -944,6 +958,7 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
type=SchemaFieldDataType(
self.BIGQUERY_FIELD_TYPE_MAPPINGS.get(col.data_type, NullType)()
),
isPartitioningKey=col.is_partition_column,
nativeDataType=col.data_type,
description=col.comment,
nullable=col.is_nullable,
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ def __init__(self, ctx: PipelineContext, config: MongoDBConfig):

# See https://pymongo.readthedocs.io/en/stable/examples/datetimes.html#handling-out-of-range-datetimes
self.mongo_client = MongoClient(
self.config.connect_uri, datetime_conversion="DATETIME_AUTO", **options
) # type: ignore
self.config.connect_uri, datetime_conversion="DATETIME_AUTO", **options # type: ignore
)

# This cheaply tests the connection. For details, see
# https://pymongo.readthedocs.io/en/stable/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@
"actor": "urn:li:corpuser:datahub"
}
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
},
{
"fieldPath": "email",
Expand All @@ -296,7 +297,8 @@
"actor": "urn:li:corpuser:datahub"
}
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
}
]
}
Expand Down Expand Up @@ -328,6 +330,29 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": [
{
"tag": "urn:li:tag:priority:high"
},
{
"tag": "urn:li:tag:purchase"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
Expand Down Expand Up @@ -463,7 +488,8 @@
}
]
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
},
{
"fieldPath": "email",
Expand All @@ -479,7 +505,8 @@
"globalTags": {
"tags": []
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
}
]
}
Expand Down Expand Up @@ -620,7 +647,8 @@
}
]
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
},
{
"fieldPath": "email",
Expand All @@ -636,7 +664,8 @@
"globalTags": {
"tags": []
},
"isPartOfKey": false
"isPartOfKey": false,
"isPartitioningKey": false
}
]
}
Expand Down Expand Up @@ -1021,5 +1050,37 @@
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:priority:high",
"changeType": "UPSERT",
"aspectName": "tagKey",
"aspect": {
"json": {
"name": "priority:high"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:purchase",
"changeType": "UPSERT",
"aspectName": "tagKey",
"aspect": {
"json": {
"name": "purchase"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
}
]
Loading
Loading