Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/bigquery): Add ability to filter GCP project ingestion based on project labels #11169

Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0c050be
feat: First pass at enabling getting projects based on folder ids
anaghshineh Apr 7, 2024
69cf8e0
chore: Add google-cloud-resource-manager dependency
anaghshineh Apr 8, 2024
df9698c
chore: Update some methods to account for new projects_client
anaghshineh Apr 8, 2024
6c77b1e
chore: Add a few tests, more to come
anaghshineh Apr 8, 2024
7b1efab
Merge branch 'master' into add-support-for-ingestion-based-on-folder-ids
anaghshineh Apr 8, 2024
7e3f829
chore: Add another unit test
anaghshineh Apr 8, 2024
463a18a
feat: Switch to using labels instead of folders, more flexible
anaghshineh Apr 8, 2024
bd1560d
chore: Remove line deletions
anaghshineh Apr 8, 2024
0aea92b
chore: Update failure report message to be accurate
anaghshineh Apr 8, 2024
82147f3
Update metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bi…
anaghshineh Apr 17, 2024
d00d4db
docs: Update inline docs for new BQ config var
anaghshineh Apr 17, 2024
110bb7e
Merge branch 'bq-filter-ingestion-based-on-proj-labels' of github.com…
anaghshineh Apr 17, 2024
b5a932b
resolve conflict
sid-acryl Aug 13, 2024
2f51f8b
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 13, 2024
f6af306
test case
sid-acryl Aug 14, 2024
6379304
Merge branch 'ing-690-bq-filter-ingestion-based-on-proj-labels' of gi…
sid-acryl Aug 14, 2024
24204f0
revert the base-requirements
sid-acryl Aug 14, 2024
68915a7
fix mock
sid-acryl Aug 14, 2024
8525355
test case
sid-acryl Aug 14, 2024
46a2447
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 14, 2024
3149ae7
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
david-leifker Aug 14, 2024
a4d235a
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 16, 2024
f049769
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 19, 2024
3584298
address review comments
sid-acryl Aug 19, 2024
f71f40f
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 19, 2024
56cafba
working with portal
sid-acryl Aug 20, 2024
624288c
Merge branch 'ing-690-bq-filter-ingestion-based-on-proj-labels' of gi…
sid-acryl Aug 20, 2024
38d3dfd
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 20, 2024
d38dd9e
update quick start
sid-acryl Aug 20, 2024
ffa2dd4
Merge branch 'ing-690-bq-filter-ingestion-based-on-proj-labels' of gi…
sid-acryl Aug 20, 2024
38c3496
lint fix
sid-acryl Aug 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/quick-ingestion-guides/bigquery/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ Please refer to the BigQuery [Permissions](https://cloud.google.com/iam/docs/per
You can always add/remove roles to Service Accounts later on. Please refer to the BigQuery [Manage access to projects, folders, and organizations](https://cloud.google.com/iam/docs/granting-changing-revoking-access) guide for more details.
:::

3. Create and download a [Service Account Key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys). We will use this to set up authentication within DataHub.
3. To filter projects based on the `project_labels` configuration, first visit [cloudresourcemanager.googleapis.com](https://console.developers.google.com/apis/api/cloudresourcemanager.googleapis.com/overview) and enable the `Cloud Resource Manager API`

4. Create and download a [Service Account Key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys). We will use this to set up authentication within DataHub.

The key file looks like this:

Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
"google-cloud-logging<=3.5.0",
"google-cloud-bigquery",
"google-cloud-datacatalog>=1.5.0",
"google-cloud-resource-manager",
"more-itertools>=8.12.0",
"sqlalchemy-bigquery>=1.4.1",
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""

self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf,
self.config.get_bigquery_client(),
report=BigQueryV2Report().schema_api_perf,
projects_client=config.get_projects_client(),
client=config.get_bigquery_client(),
)
if self.config.extract_policy_tags_from_catalog:
self.bigquery_data_dictionary.datacatalog_client = (
Expand Down Expand Up @@ -257,14 +258,37 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

def _get_projects(self) -> List[BigqueryProject]:
logger.info("Getting projects")

if self.config.project_ids or self.config.project_id:
project_ids = self.config.project_ids or [self.config.project_id] # type: ignore
return [
BigqueryProject(id=project_id, name=project_id)
for project_id in project_ids
]
else:
return list(self._query_project_list())

if self.config.project_labels:
return list(self._query_project_list_from_labels())

return list(self._query_project_list())

def _query_project_list_from_labels(self) -> Iterable[BigqueryProject]:
projects = self.bigquery_data_dictionary.get_projects_with_labels(
self.config.project_labels
)

if not projects: # Report failure on exception and if empty list is returned
self.report.report_failure(
"metadata-extraction",
"Get projects didn't return any project with any of the specified label(s). "
"Maybe resourcemanager.projects.list permission is missing for the service account. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account.",
)

for project in projects:
if self.config.project_id_pattern.allowed(project.id):
yield project
else:
self.report.report_dropped(project.id)

def _query_project_list(self) -> Iterable[BigqueryProject]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union

from google.cloud import bigquery, datacatalog_v1
from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator

Expand Down Expand Up @@ -34,12 +34,16 @@ class BigQueryUsageConfig(BaseUsageConfig):

max_query_duration: timedelta = Field(
default=timedelta(minutes=15),
description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.",
description="Correction to pad start_time and end_time with. For handling the case where the read happens "
"within our time range but the query completion event is delayed and happens after the configured"
" end time.",
)

apply_view_usage_to_tables: bool = Field(
default=False,
description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies usage to views / tables mentioned in the query. If set to True, usage is applied to base tables only.",
description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies "
"usage to views / tables mentioned in the query. If set to True, usage is applied to base tables "
"only.",
)


Expand Down Expand Up @@ -74,6 +78,9 @@ def get_bigquery_client(self) -> bigquery.Client:
client_options = self.extra_client_options
return bigquery.Client(self.project_on_behalf, **client_options)

def get_projects_client(self) -> resourcemanager_v3.ProjectsClient:
return resourcemanager_v3.ProjectsClient()

def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient:
return datacatalog_v1.PolicyTagManagerClient()

Expand Down Expand Up @@ -143,12 +150,14 @@ class BigQueryV2Config(

dataset_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'",
description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. "
"e.g. to match all tables in schema analytics, use the regex 'analytics'",
)

match_fully_qualified_names: bool = Field(
default=True,
description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name `<project_id>.<dataset_name>`.",
description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name "
"`<project_id>.<dataset_name>`.",
)

include_external_url: bool = Field(
Expand All @@ -169,7 +178,9 @@ class BigQueryV2Config(

table_snapshot_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire "
"snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with "
"customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)

debug_include_full_payloads: bool = Field(
Expand All @@ -180,17 +191,22 @@ class BigQueryV2Config(
number_of_datasets_process_in_batch: int = Field(
hidden_from_docs=True,
default=10000,
description="Number of table queried in batch when getting metadata. This is a low level config property which should be touched with care.",
description="Number of table queried in batch when getting metadata. This is a low level config property "
"which should be touched with care.",
)

number_of_datasets_process_in_batch_if_profiling_enabled: int = Field(
default=1000,
description="Number of partitioned table queried in batch when getting metadata. This is a low level config property which should be touched with care. This restriction is needed because we query partitions system view which throws error if we try to touch too many tables.",
description="Number of partitioned table queried in batch when getting metadata. This is a low level config "
"property which should be touched with care. This restriction is needed because we query "
"partitions system view which throws error if we try to touch too many tables.",
)

use_tables_list_query_v2: bool = Field(
default=False,
description="List tables using an improved query that extracts partitions and last modified timestamps more accurately. Requires the ability to read table data. Automatically enabled when profiling is enabled.",
description="List tables using an improved query that extracts partitions and last modified timestamps more "
"accurately. Requires the ability to read table data. Automatically enabled when profiling is "
"enabled.",
)

@property
Expand All @@ -199,7 +215,9 @@ def have_table_data_read_permission(self) -> bool:

column_limit: int = Field(
default=300,
description="Maximum number of columns to process in a table. This is a low level config property which should be touched with care. This restriction is needed because excessively wide tables can result in failure to ingest the schema.",
description="Maximum number of columns to process in a table. This is a low level config property which "
"should be touched with care. This restriction is needed because excessively wide tables can "
"result in failure to ingest the schema.",
)
# The inheritance hierarchy is wonky here, but these options need modifications.
project_id: Optional[str] = Field(
Expand All @@ -214,6 +232,15 @@ def have_table_data_read_permission(self) -> bool:
"Overrides `project_id_pattern`."
),
)
project_labels: List[str] = Field(
default_factory=list,
description=(
"Ingests projects with the specified labels. Set value in the format of `key:value`. Use this property to "
"define which projects to ingest based"
"on project-level labels. If project_ids or project_id is set, this configuration has no effect. The "
"ingestion process filters projects by label first, and then applies the project_id_pattern."
),
)

storage_project_id: None = Field(default=None, hidden_from_docs=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class BigQuerySchemaApiPerfReport(Report):
num_get_snapshots_for_dataset_api_requests: int = 0

list_projects: PerfTimer = field(default_factory=PerfTimer)
list_projects_with_labels: PerfTimer = field(default_factory=PerfTimer)
list_datasets: PerfTimer = field(default_factory=PerfTimer)

get_columns_for_dataset_sec: float = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Dict, Iterable, Iterator, List, Optional

from google.api_core import retry
from google.cloud import bigquery, datacatalog_v1
from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
from google.cloud.bigquery.table import (
RowIterator,
TableListItem,
Expand Down Expand Up @@ -144,9 +144,11 @@ def __init__(
self,
report: BigQuerySchemaApiPerfReport,
client: bigquery.Client,
projects_client: resourcemanager_v3.ProjectsClient,
datacatalog_client: Optional[datacatalog_v1.PolicyTagManagerClient] = None,
) -> None:
self.bq_client = client
self.projects_client = projects_client
self.report = report
self.datacatalog_client = datacatalog_client

Expand Down Expand Up @@ -175,7 +177,7 @@ def _should_retry(exc: BaseException) -> bool:
# 'Quota exceeded: Your user exceeded quota for concurrent project.lists requests.'
# Hence, added the api request retry of 15 min.
# We already tried adding rate_limit externally, proving max_result and page_size
# to restrict the request calls inside list_project but issue still occured.
# to restrict the request calls inside list_project but issue still occurred.
projects_iterator = self.bq_client.list_projects(
max_results=max_results_per_page,
page_token=page_token,
Expand All @@ -202,6 +204,26 @@ def _should_retry(exc: BaseException) -> bool:
return []
return projects

def get_projects_with_labels(self, labels: List[str]) -> List[BigqueryProject]:
with self.report.list_projects_with_labels:
try:
projects = []
labels_query = " OR ".join([f"labels.{label}" for label in labels])
for project in self.projects_client.search_projects(query=labels_query):
projects.append(
BigqueryProject(
id=project.project_id, name=project.display_name
)
)

return projects

except Exception as e:
logger.error(
f"Error getting projects with labels: {labels}. {e}", exc_info=True
)
return []

def get_datasets_for_project_id(
self, project_id: str, maxResults: Optional[int] = None
) -> List[BigqueryDataset]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ def metadata_read_capability_test(
client: bigquery.Client = config.get_bigquery_client()
assert client
bigquery_data_dictionary = BigQuerySchemaApi(
BigQueryV2Report().schema_api_perf, client
report=BigQueryV2Report().schema_api_perf,
projects_client=config.get_projects_client(),
client=client,
)
result = bigquery_data_dictionary.get_datasets_for_project_id(
project_id, 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,9 @@ def lineage_via_catalog_lineage_api(
lineage_client: lineage_v1.LineageClient = lineage_v1.LineageClient()

data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf, self.config.get_bigquery_client()
self.report.schema_api_perf,
self.config.get_bigquery_client(),
self.config.get_projects_client(),
)

# Filtering datasets
Expand Down
Loading
Loading