Skip to content

Commit

Permalink
feat(ingestion/bigquery): Add ability to filter GCP project ingestion…
Browse files Browse the repository at this point in the history
… based on project labels (#11169)

Co-authored-by: Alice Naghshineh <alice.naghshineh@nytimes.com>
Co-authored-by: Alice Naghshineh <45885699+anaghshineh@users.noreply.github.com>
Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
Co-authored-by: david-leifker <114954101+david-leifker@users.noreply.github.com>
  • Loading branch information
5 people committed Aug 20, 2024
1 parent bc79aec commit 627c5ab
Show file tree
Hide file tree
Showing 11 changed files with 786 additions and 63 deletions.
4 changes: 3 additions & 1 deletion docs/quick-ingestion-guides/bigquery/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ Please refer to the BigQuery [Permissions](https://cloud.google.com/iam/docs/per
You can always add/remove roles to Service Accounts later on. Please refer to the BigQuery [Manage access to projects, folders, and organizations](https://cloud.google.com/iam/docs/granting-changing-revoking-access) guide for more details.
:::

3. Create and download a [Service Account Key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys). We will use this to set up authentication within DataHub.
3. To filter projects based on the `project_labels` configuration, first visit [cloudresourcemanager.googleapis.com](https://console.developers.google.com/apis/api/cloudresourcemanager.googleapis.com/overview) and enable the `Cloud Resource Manager API`

4. Create and download a [Service Account Key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys). We will use this to set up authentication within DataHub.

The key file looks like this:

Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
"google-cloud-logging<=3.5.0",
"google-cloud-bigquery",
"google-cloud-datacatalog>=1.5.0",
"google-cloud-resource-manager",
"more-itertools>=8.12.0",
"sqlalchemy-bigquery>=1.4.1",
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""

self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf,
self.config.get_bigquery_client(),
report=BigQueryV2Report().schema_api_perf,
projects_client=config.get_projects_client(),
client=config.get_bigquery_client(),
)
if self.config.extract_policy_tags_from_catalog:
self.bigquery_data_dictionary.datacatalog_client = (
Expand Down Expand Up @@ -257,14 +258,37 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

def _get_projects(self) -> List[BigqueryProject]:
logger.info("Getting projects")

if self.config.project_ids or self.config.project_id:
project_ids = self.config.project_ids or [self.config.project_id] # type: ignore
return [
BigqueryProject(id=project_id, name=project_id)
for project_id in project_ids
]
else:
return list(self._query_project_list())

if self.config.project_labels:
return list(self._query_project_list_from_labels())

return list(self._query_project_list())

def _query_project_list_from_labels(self) -> Iterable[BigqueryProject]:
projects = self.bigquery_data_dictionary.get_projects_with_labels(
self.config.project_labels
)

if not projects: # Report failure on exception and if empty list is returned
self.report.report_failure(
"metadata-extraction",
"Get projects didn't return any project with any of the specified label(s). "
"Maybe resourcemanager.projects.list permission is missing for the service account. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account.",
)

for project in projects:
if self.config.project_id_pattern.allowed(project.id):
yield project
else:
self.report.report_dropped(project.id)

def _query_project_list(self) -> Iterable[BigqueryProject]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union

from google.cloud import bigquery, datacatalog_v1
from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator

Expand Down Expand Up @@ -34,12 +34,16 @@ class BigQueryUsageConfig(BaseUsageConfig):

max_query_duration: timedelta = Field(
default=timedelta(minutes=15),
description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.",
description="Correction to pad start_time and end_time with. For handling the case where the read happens "
"within our time range but the query completion event is delayed and happens after the configured"
" end time.",
)

apply_view_usage_to_tables: bool = Field(
default=False,
description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies usage to views / tables mentioned in the query. If set to True, usage is applied to base tables only.",
description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies "
"usage to views / tables mentioned in the query. If set to True, usage is applied to base tables "
"only.",
)


Expand Down Expand Up @@ -74,6 +78,9 @@ def get_bigquery_client(self) -> bigquery.Client:
client_options = self.extra_client_options
return bigquery.Client(self.project_on_behalf, **client_options)

def get_projects_client(self) -> resourcemanager_v3.ProjectsClient:
return resourcemanager_v3.ProjectsClient()

def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient:
return datacatalog_v1.PolicyTagManagerClient()

Expand Down Expand Up @@ -143,12 +150,14 @@ class BigQueryV2Config(

dataset_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'",
description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. "
"e.g. to match all tables in schema analytics, use the regex 'analytics'",
)

match_fully_qualified_names: bool = Field(
default=True,
description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name `<project_id>.<dataset_name>`.",
description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name "
"`<project_id>.<dataset_name>`.",
)

include_external_url: bool = Field(
Expand All @@ -169,7 +178,9 @@ class BigQueryV2Config(

table_snapshot_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire "
"snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with "
"customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)

debug_include_full_payloads: bool = Field(
Expand All @@ -180,17 +191,22 @@ class BigQueryV2Config(
number_of_datasets_process_in_batch: int = Field(
hidden_from_docs=True,
default=10000,
description="Number of table queried in batch when getting metadata. This is a low level config property which should be touched with care.",
description="Number of table queried in batch when getting metadata. This is a low level config property "
"which should be touched with care.",
)

number_of_datasets_process_in_batch_if_profiling_enabled: int = Field(
default=1000,
description="Number of partitioned table queried in batch when getting metadata. This is a low level config property which should be touched with care. This restriction is needed because we query partitions system view which throws error if we try to touch too many tables.",
description="Number of partitioned table queried in batch when getting metadata. This is a low level config "
"property which should be touched with care. This restriction is needed because we query "
"partitions system view which throws error if we try to touch too many tables.",
)

use_tables_list_query_v2: bool = Field(
default=False,
description="List tables using an improved query that extracts partitions and last modified timestamps more accurately. Requires the ability to read table data. Automatically enabled when profiling is enabled.",
description="List tables using an improved query that extracts partitions and last modified timestamps more "
"accurately. Requires the ability to read table data. Automatically enabled when profiling is "
"enabled.",
)

@property
Expand All @@ -199,7 +215,9 @@ def have_table_data_read_permission(self) -> bool:

column_limit: int = Field(
default=300,
description="Maximum number of columns to process in a table. This is a low level config property which should be touched with care. This restriction is needed because excessively wide tables can result in failure to ingest the schema.",
description="Maximum number of columns to process in a table. This is a low level config property which "
"should be touched with care. This restriction is needed because excessively wide tables can "
"result in failure to ingest the schema.",
)
# The inheritance hierarchy is wonky here, but these options need modifications.
project_id: Optional[str] = Field(
Expand All @@ -214,6 +232,15 @@ def have_table_data_read_permission(self) -> bool:
"Overrides `project_id_pattern`."
),
)
project_labels: List[str] = Field(
default_factory=list,
description=(
"Ingests projects with the specified labels. Set value in the format of `key:value`. Use this property to "
"define which projects to ingest based"
"on project-level labels. If project_ids or project_id is set, this configuration has no effect. The "
"ingestion process filters projects by label first, and then applies the project_id_pattern."
),
)

storage_project_id: None = Field(default=None, hidden_from_docs=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class BigQuerySchemaApiPerfReport(Report):
num_get_snapshots_for_dataset_api_requests: int = 0

list_projects: PerfTimer = field(default_factory=PerfTimer)
list_projects_with_labels: PerfTimer = field(default_factory=PerfTimer)
list_datasets: PerfTimer = field(default_factory=PerfTimer)

get_columns_for_dataset_sec: float = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Dict, Iterable, Iterator, List, Optional

from google.api_core import retry
from google.cloud import bigquery, datacatalog_v1
from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
from google.cloud.bigquery.table import (
RowIterator,
TableListItem,
Expand Down Expand Up @@ -144,9 +144,11 @@ def __init__(
self,
report: BigQuerySchemaApiPerfReport,
client: bigquery.Client,
projects_client: resourcemanager_v3.ProjectsClient,
datacatalog_client: Optional[datacatalog_v1.PolicyTagManagerClient] = None,
) -> None:
self.bq_client = client
self.projects_client = projects_client
self.report = report
self.datacatalog_client = datacatalog_client

Expand Down Expand Up @@ -175,7 +177,7 @@ def _should_retry(exc: BaseException) -> bool:
# 'Quota exceeded: Your user exceeded quota for concurrent project.lists requests.'
# Hence, added the api request retry of 15 min.
# We already tried adding rate_limit externally, proving max_result and page_size
# to restrict the request calls inside list_project but issue still occured.
# to restrict the request calls inside list_project but issue still occurred.
projects_iterator = self.bq_client.list_projects(
max_results=max_results_per_page,
page_token=page_token,
Expand All @@ -202,6 +204,26 @@ def _should_retry(exc: BaseException) -> bool:
return []
return projects

def get_projects_with_labels(self, labels: List[str]) -> List[BigqueryProject]:
with self.report.list_projects_with_labels:
try:
projects = []
labels_query = " OR ".join([f"labels.{label}" for label in labels])
for project in self.projects_client.search_projects(query=labels_query):
projects.append(
BigqueryProject(
id=project.project_id, name=project.display_name
)
)

return projects

except Exception as e:
logger.error(
f"Error getting projects with labels: {labels}. {e}", exc_info=True
)
return []

def get_datasets_for_project_id(
self, project_id: str, maxResults: Optional[int] = None
) -> List[BigqueryDataset]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ def metadata_read_capability_test(
client: bigquery.Client = config.get_bigquery_client()
assert client
bigquery_data_dictionary = BigQuerySchemaApi(
BigQueryV2Report().schema_api_perf, client
report=BigQueryV2Report().schema_api_perf,
projects_client=config.get_projects_client(),
client=client,
)
result = bigquery_data_dictionary.get_datasets_for_project_id(
project_id, 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,9 @@ def lineage_via_catalog_lineage_api(
lineage_client: lineage_v1.LineageClient = lineage_v1.LineageClient()

data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf, self.config.get_bigquery_client()
self.report.schema_api_perf,
self.config.get_bigquery_client(),
self.config.get_projects_client(),
)

# Filtering datasets
Expand Down
Loading

0 comments on commit 627c5ab

Please sign in to comment.