Skip to content

Commit

Permalink
fix(ingest/bigquery): Map BigQuery policy tags to datahub column-leve…
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware authored Jun 14, 2024
1 parent bb44c4c commit d699660
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 19 deletions.
28 changes: 15 additions & 13 deletions metadata-ingestion/docs/sources/bigquery/bigquery_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@ There are two important concepts to understand and identify:
If you have multiple projects in your BigQuery setup, the role should be granted these permissions in each of the projects.

:::
| permission                       | Description                                                                                                 | Capability               | Default GCP role which contains this permission                                                                 |
|----------------------------------|--------------------------------------------------------------------------------------------------------------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------|
| `bigquery.datasets.get`         | Retrieve metadata about a dataset.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.list`           | List BigQuery tables.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.get`           | Retrieve metadata for a table.                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.get`           | Get Routines. Needs to retrieve metadata for a table from system table.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.list`           | List Routines. Needs to retrieve metadata for a table from system table                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `resourcemanager.projects.get`   | Retrieve project names and metadata.                                                                         | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.jobs.listAll`         | List all jobs (queries) submitted by any user. Needs for Lineage extraction.                                 | Lineage Extraction/Usage extraction | [roles/bigquery.resourceViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.resourceViewer) |
| `logging.logEntries.list`       | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
| `logging.privateLogEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
| `bigquery.tables.getData`       | Access table data to extract storage size, last updated at, data profiles etc. | Profiling                           |                                                                                                                 |
| Permission | Description | Capability | Default GCP Role Which Contains This Permission |
|----------------------------------|-----------------------------------------------------------------------------------------------------------------|-------------------------------------|---------------------------------------------------------------------------|
| `bigquery.datasets.get` | Retrieve metadata about a dataset. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.list` | List BigQuery tables. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.get` | Retrieve metadata for a table. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.get` | Get Routines. Needs to retrieve metadata for a table from system table. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.list` | List Routines. Needs to retrieve metadata for a table from system table. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `resourcemanager.projects.get` | Retrieve project names and metadata. | Table Metadata Extraction | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.jobs.listAll` | List all jobs (queries) submitted by any user. Needs for Lineage extraction. | Lineage Extraction/Usage Extraction | [roles/bigquery.resourceViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.resourceViewer) |
| `logging.logEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage Extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
| `logging.privateLogEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage Extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
| `bigquery.tables.getData` | Access table data to extract storage size, last updated at, data profiles etc. | Profiling | |
| `datacatalog.policyTags.get` | *Optional* Get policy tags for columns with associated policy tags. This permission is required only if `extract_policy_tags_from_catalog` is enabled. | Policy Tag Extraction | [roles/datacatalog.viewer](https://cloud.google.com/data-catalog/docs/access-control#permissions-and-roles) |


#### Create a service account in the Extractor Project

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ source:
#include_tables: true
#include_views: true
#include_table_lineage: true
#extract_policy_tags_from_catalog: true
#start_time: 2021-12-15T20:08:23.091Z
#end_time: 2023-12-15T20:08:23.091Z
#profiling:
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
# Google cloud logging library
"google-cloud-logging<=3.5.0",
"google-cloud-bigquery",
"google-cloud-datacatalog>=1.5.0",
"more-itertools>=8.12.0",
"sqlalchemy-bigquery>=1.4.1",
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
)
from datahub.utilities.mapping import Constants
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.ratelimiter import RateLimiter
from datahub.utilities.registries.domain_registry import DomainRegistry

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -236,8 +237,14 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""

self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf, self.config.get_bigquery_client()
self.report.schema_api_perf,
self.config.get_bigquery_client(),
)
if self.config.extract_policy_tags_from_catalog:
self.bigquery_data_dictionary.datacatalog_client = (
self.config.get_policy_tag_manager_client()
)

self.sql_parser_schema_resolver = self._init_schema_resolver()

self.data_reader: Optional[BigQueryDataReader] = None
Expand Down Expand Up @@ -742,6 +749,12 @@ def _process_schema(

columns = None

rate_limiter: Optional[RateLimiter] = None
if self.config.rate_limit:
rate_limiter = RateLimiter(
max_calls=self.config.requests_per_min, period=60
)

if (
self.config.include_tables
or self.config.include_views
Expand All @@ -752,6 +765,9 @@ def _process_schema(
dataset_name=dataset_name,
column_limit=self.config.column_limit,
run_optimized_column_query=self.config.run_optimized_column_query,
extract_policy_tags_from_catalog=self.config.extract_policy_tags_from_catalog,
report=self.report,
rate_limiter=rate_limiter,
)

if self.config.include_tables:
Expand Down Expand Up @@ -1275,6 +1291,9 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
)
)

if col.policy_tags:
for policy_tag in col.policy_tags:
tags.append(TagAssociationClass(make_tag_urn(policy_tag)))
field = SchemaField(
fieldPath=col.name,
type=SchemaFieldDataType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union

from google.cloud import bigquery
from google.cloud import bigquery, datacatalog_v1
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator

Expand Down Expand Up @@ -70,6 +70,9 @@ def get_bigquery_client(self) -> bigquery.Client:
client_options = self.extra_client_options
return bigquery.Client(self.project_on_behalf, **client_options)

def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient:
return datacatalog_v1.PolicyTagManagerClient()

def make_gcp_logging_client(
self, project_id: Optional[str] = None
) -> GCPLoggingClient:
Expand Down Expand Up @@ -226,6 +229,16 @@ class BigQueryV2Config(
description="Use the legacy sharded table urn suffix added.",
)

extract_policy_tags_from_catalog: bool = Field(
default=False,
description=(
"This flag enables the extraction of policy tags from the Google Data Catalog API. "
"When enabled, the extractor will fetch policy tags associated with BigQuery table columns. "
"For more information about policy tags and column-level security, refer to the documentation: "
"https://cloud.google.com/bigquery/docs/column-level-security-intro"
),
)

scheme: str = "bigquery"

log_page_size: PositiveInt = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, Iterator, List, Optional
from typing import Any, Dict, Iterable, Iterator, List, Optional

from google.cloud import bigquery
from google.cloud import bigquery, datacatalog_v1
from google.cloud.bigquery.table import (
RowIterator,
TableListItem,
Expand All @@ -22,6 +22,7 @@
BigqueryTableType,
)
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
from datahub.utilities.ratelimiter import RateLimiter

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -31,6 +32,7 @@ class BigqueryColumn(BaseColumn):
field_path: str
is_partition_column: bool
cluster_column_position: Optional[int]
policy_tags: Optional[List[str]] = None


RANGE_PARTITION_NAME: str = "RANGE"
Expand Down Expand Up @@ -137,10 +139,14 @@ class BigqueryProject:

class BigQuerySchemaApi:
def __init__(
self, report: BigQuerySchemaApiPerfReport, client: bigquery.Client
self,
report: BigQuerySchemaApiPerfReport,
client: bigquery.Client,
datacatalog_client: Optional[datacatalog_v1.PolicyTagManagerClient] = None,
) -> None:
self.bq_client = client
self.report = report
self.datacatalog_client = datacatalog_client

def get_query_result(self, query: str) -> RowIterator:
logger.debug(f"Query : {query}")
Expand Down Expand Up @@ -347,12 +353,69 @@ def _make_bigquery_view(view: bigquery.Row) -> BigqueryView:
rows_count=view.get("row_count"),
)

def get_policy_tags_for_column(
self,
project_id: str,
dataset_name: str,
table_name: str,
column_name: str,
report: BigQueryV2Report,
rate_limiter: Optional[RateLimiter] = None,
) -> Iterable[str]:
assert self.datacatalog_client

try:
# Get the table schema
table_ref = f"{project_id}.{dataset_name}.{table_name}"
table = self.bq_client.get_table(table_ref)
schema = table.schema

# Find the specific field in the schema
field = next((f for f in schema if f.name == column_name), None)
if not field or not field.policy_tags:
return

# Retrieve policy tag display names
for policy_tag_name in field.policy_tags.names:
try:
if rate_limiter:
with rate_limiter:
policy_tag = self.datacatalog_client.get_policy_tag(
name=policy_tag_name
)
else:
policy_tag = self.datacatalog_client.get_policy_tag(
name=policy_tag_name
)
yield policy_tag.display_name
except Exception as e:
logger.warning(
f"Unexpected error when retrieving policy tag {policy_tag_name} for column {column_name} in table {table_name}: {e}",
exc_info=True,
)
report.report_warning(
"metadata-extraction",
f"Failed to retrieve policy tag {policy_tag_name} for column {column_name} in table {table_name} due to unexpected error: {e}",
)
except Exception as e:
logger.error(
f"Unexpected error retrieving schema for table {table_name} in dataset {dataset_name}, project {project_id}: {e}",
exc_info=True,
)
report.report_warning(
"metadata-extraction",
f"Failed to retrieve schema for table {table_name} in dataset {dataset_name}, project {project_id} due to unexpected error: {e}",
)

def get_columns_for_dataset(
self,
project_id: str,
dataset_name: str,
column_limit: int,
report: BigQueryV2Report,
run_optimized_column_query: bool = False,
extract_policy_tags_from_catalog: bool = False,
rate_limiter: Optional[RateLimiter] = None,
) -> Optional[Dict[str, List[BigqueryColumn]]]:
columns: Dict[str, List[BigqueryColumn]] = defaultdict(list)
with self.report.get_columns_for_dataset:
Expand Down Expand Up @@ -397,6 +460,18 @@ def get_columns_for_dataset(
comment=column.comment,
is_partition_column=column.is_partitioning_column == "YES",
cluster_column_position=column.clustering_ordinal_position,
policy_tags=list(
self.get_policy_tags_for_column(
project_id,
dataset_name,
column.table_name,
column.column_name,
report,
rate_limiter,
)
)
if extract_policy_tags_from_catalog
else [],
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,11 @@
"nativeDataType": "INT",
"recursive": false,
"globalTags": {
"tags": []
"tags": [
{
"tag": "urn:li:tag:Test Policy Tag"
}
]
},
"glossaryTerms": {
"terms": [
Expand Down Expand Up @@ -428,5 +432,21 @@
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:Test Policy Tag",
"changeType": "UPSERT",
"aspectName": "tagKey",
"aspect": {
"json": {
"name": "Test Policy Tag"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
}
]
Loading

0 comments on commit d699660

Please sign in to comment.