Skip to content

Commit

Permalink
feat(ingestion): support multiple project IDs in bigquery usage stats (
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jul 21, 2021
1 parent 5f0b446 commit ad30f2b
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 20 deletions.
6 changes: 4 additions & 2 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ Note: when `load_schemas` is False, models that use [identifiers](https://docs.g
- Fetch a list of tables and columns accessed
- Aggregate these statistics into buckets, by day or hour granularity

Note: the client must have one of the following OAuth scopes:
Note: the client must have one of the following OAuth scopes, and should be authorized on all projects you'd like to ingest usage stats from.

- https://www.googleapis.com/auth/logging.read
- https://www.googleapis.com/auth/logging.admin
Expand All @@ -809,7 +809,9 @@ Note: the client must have one of the following OAuth scopes:
source:
type: bigquery-usage
config:
project_id: project # optional - can autodetect from environment
projects: # optional - can autodetect a single project from the environment
- project_id_1
- project_id_2
options:
# See https://googleapis.dev/python/logging/latest/client.html for details.
credentials: ~ # optional - see docs
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ exclude_lines =
pragma: no cover
@abstract
if TYPE_CHECKING:
include =
src/*
omit =
# omit codegen
src/datahub/metadata/*
Expand Down
55 changes: 41 additions & 14 deletions metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import dataclasses
import heapq
import logging
import re
from dataclasses import dataclass
Expand Down Expand Up @@ -214,12 +215,21 @@ def from_entry(cls, entry: AuditLogEntry) -> "QueryEvent":


class BigQueryUsageConfig(BaseUsageConfig):
project_id: Optional[str] = None
projects: Optional[List[str]] = None
project_id: Optional[str] = None # deprecated in favor of `projects`
extra_client_options: dict = {}
env: str = builder.DEFAULT_ENV

query_log_delay: Optional[pydantic.PositiveInt] = None

@pydantic.validator("project_id")
def note_project_id_deprecation(cls, v, values, **kwargs):
logger.warning(
"bigquery-usage project_id option is deprecated; use projects instead"
)
values["projects"] = [v]
return None


@dataclass
class BigQueryUsageSourceReport(SourceReport):
Expand All @@ -233,28 +243,19 @@ class BigQueryUsageSource(Source):
config: BigQueryUsageConfig
report: BigQueryUsageSourceReport

client: GCPLoggingClient

def __init__(self, config: BigQueryUsageConfig, ctx: PipelineContext):
super().__init__(ctx)
self.config = config
self.report = BigQueryUsageSourceReport()

client_options = self.config.extra_client_options.copy()
if self.config.project_id is not None:
client_options["project"] = self.config.project_id

# See https://github.com/googleapis/google-cloud-python/issues/2674 for
# why we disable gRPC here.
self.client = GCPLoggingClient(**client_options, _use_grpc=False)

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "BigQueryUsageSource":
config = BigQueryUsageConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunits(self) -> Iterable[UsageStatsWorkUnit]:
bigquery_log_entries = self._get_bigquery_log_entries()
clients = self._make_bigquery_clients()
bigquery_log_entries = self._get_bigquery_log_entries(clients)
parsed_events = self._parse_bigquery_log_entries(bigquery_log_entries)
hydrated_read_events = self._join_events_by_job_id(parsed_events)
aggregated_info = self._aggregate_enriched_read_events(hydrated_read_events)
Expand All @@ -265,15 +266,41 @@ def get_workunits(self) -> Iterable[UsageStatsWorkUnit]:
self.report.report_workunit(wu)
yield wu

def _get_bigquery_log_entries(self) -> Iterable[AuditLogEntry]:
def _make_bigquery_clients(self) -> List[GCPLoggingClient]:
# See https://github.com/googleapis/google-cloud-python/issues/2674 for
# why we disable gRPC here.
client_options = self.config.extra_client_options.copy()
client_options["_use_grpc"] = False
if self.config.projects is None:
return [
GCPLoggingClient(**client_options),
]
else:
return [
GCPLoggingClient(**client_options, project=project_id)
for project_id in self.config.projects
]

def _get_bigquery_log_entries(
self, clients: List[GCPLoggingClient]
) -> Iterable[AuditLogEntry]:
filter = BQ_FILTER_RULE_TEMPLATE.format(
start_time=self.config.start_time.strftime(BQ_DATETIME_FORMAT),
end_time=self.config.end_time.strftime(BQ_DATETIME_FORMAT),
)

def get_entry_timestamp(entry: AuditLogEntry) -> datetime:
return entry.timestamp

entry: AuditLogEntry
for i, entry in enumerate(
self.client.list_entries(filter_=filter, page_size=GCP_LOGGING_PAGE_SIZE)
heapq.merge(
*(
client.list_entries(filter_=filter, page_size=GCP_LOGGING_PAGE_SIZE)
for client in clients
),
key=get_entry_timestamp,
)
):
if i == 0:
logger.debug("starting log load from BigQuery")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
WRITE_REFERENCE_FILE = False


def test_config_time_defaults():
def test_bq_usage_config():
config = BigQueryUsageConfig.parse_obj(
dict(
project_id="sample-bigquery-project-name-1234",
bucket_duration="HOUR",
)
)
assert (config.end_time - config.start_time) == timedelta(hours=1)
assert config.projects == ["sample-bigquery-project-name-1234"]


def test_bq_usage_source(pytestconfig, tmp_path):
Expand All @@ -36,12 +37,16 @@ def test_bq_usage_source(pytestconfig, tmp_path):
if WRITE_REFERENCE_FILE:
source = BigQueryUsageSource.create(
dict(
project_id="harshal-playground-306419",
projects=[
"harshal-playground-306419",
],
start_time=datetime.now(tz=timezone.utc) - timedelta(days=25),
),
PipelineContext(run_id="bq-usage-test"),
)
entries = list(source._get_bigquery_log_entries())
entries = list(
source._get_bigquery_log_entries(source._make_bigquery_clients())
)

entries = [entry._replace(logger=None) for entry in entries]
log_entries = jsonpickle.encode(entries, indent=4)
Expand All @@ -62,7 +67,7 @@ def test_bq_usage_source(pytestconfig, tmp_path):
"run_id": "test-bigquery-usage",
"source": {
"type": "bigquery-usage",
"config": {"project_id": "sample-bigquery-project-1234"},
"config": {"projects": ["sample-bigquery-project-1234"]},
},
"sink": {
"type": "file",
Expand Down

0 comments on commit ad30f2b

Please sign in to comment.