Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): snowflake, bigquery - enhancements to log and bugfix #4442

Merged
merged 10 commits into from
Mar 21, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ def default_end_time(
cls, v: Any, *, values: Dict[str, Any], **kwargs: Any
) -> datetime:
return v or get_time_bucket(
datetime.now(tz=timezone.utc), values["bucket_duration"]
datetime.now(tz=timezone.utc)
+ get_bucket_duration_delta(values["bucket_duration"]),
values["bucket_duration"],
)

@pydantic.validator("start_time", pre=True, always=True)
def default_start_time(
cls, v: Any, *, values: Dict[str, Any], **kwargs: Any
) -> datetime:
return v or (
values["end_time"] - get_bucket_duration_delta(values["bucket_duration"])
values["end_time"]
- get_bucket_duration_delta(values["bucket_duration"]) * 2
)

@pydantic.validator("start_time", "end_time")
Expand Down
211 changes: 92 additions & 119 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,18 @@
import re
import textwrap
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
from unittest.mock import patch

# This import verifies that the dependencies are available.
import pybigquery # noqa: F401
import pybigquery.sqlalchemy_bigquery
import pydantic
from dateutil import parser
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector

from datahub.configuration.common import ConfigurationError
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter import mce_builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
Expand All @@ -34,7 +30,6 @@
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemyConfig,
SQLAlchemySource,
SQLSourceReport,
SqlWorkUnit,
make_sqlalchemy_type,
register_custom_type,
Expand All @@ -44,10 +39,11 @@
BQ_DATETIME_FORMAT,
AuditLogEntry,
BigQueryAuditMetadata,
BigQueryCredential,
BigQueryTableRef,
QueryEvent,
)
from datahub.ingestion.source_config.sql.bigquery import BigQueryConfig
from datahub.ingestion.source_report.sql.bigquery import BigQueryReport
from datahub.metadata.com.linkedin.pegasus2avro.metadata.key import DatasetKey
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
Expand Down Expand Up @@ -230,60 +226,6 @@ class BigQueryPartitionColumn:
partition_id: str


class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig):
scheme: str = "bigquery"
project_id: Optional[str] = None
lineage_client_project_id: Optional[str] = None

log_page_size: Optional[pydantic.PositiveInt] = 1000
credential: Optional[BigQueryCredential]
# extra_client_options, include_table_lineage and max_query_duration are relevant only when computing the lineage.
extra_client_options: Dict[str, Any] = {}
include_table_lineage: Optional[bool] = True
max_query_duration: timedelta = timedelta(minutes=15)

credentials_path: Optional[str] = None
bigquery_audit_metadata_datasets: Optional[List[str]] = None
use_exported_bigquery_audit_metadata: bool = False
use_date_sharded_audit_log_tables: bool = False
_credentials_path: Optional[str] = pydantic.PrivateAttr(None)
use_v2_audit_metadata: Optional[bool] = False

def __init__(self, **data: Any):
super().__init__(**data)

if self.credential:
self._credentials_path = self.credential.create_credential_temp_file()
logger.debug(
f"Creating temporary credential file at {self._credentials_path}"
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path

def get_sql_alchemy_url(self):
if self.project_id:
return f"{self.scheme}://{self.project_id}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return f"{self.scheme}://"

@pydantic.validator("platform_instance")
def bigquery_doesnt_need_platform_instance(cls, v):
if v is not None:
raise ConfigurationError(
"BigQuery project ids are globally unique. You do not need to specify a platform instance."
)

@pydantic.validator("platform")
def platform_is_always_bigquery(cls, v):
return "bigquery"


@dataclass
class BigQueryReport(SQLSourceReport):
pass


class BigQuerySource(SQLAlchemySource):
config: BigQueryConfig
maximum_shard_ids: Dict[str, str] = dict()
Expand All @@ -292,6 +234,7 @@ class BigQuerySource(SQLAlchemySource):
def __init__(self, config, ctx):
super().__init__(config, ctx, "bigquery")
self.config: BigQueryConfig = config
self.ctx = ctx
self.report: BigQueryReport = BigQueryReport()
self.lineage_metadata: Optional[Dict[str, Set[str]]] = None
self.maximum_shard_ids: Dict[str, str] = dict()
Expand All @@ -303,21 +246,24 @@ def get_db_name(self, inspector: Inspector = None) -> str:
return self._get_project_id(inspector)

def _compute_big_query_lineage(self) -> None:
if self.config.include_table_lineage:
lineage_client_project_id = self._get_lineage_client_project_id()
if self.config.use_exported_bigquery_audit_metadata:
self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
lineage_client_project_id
)
else:
self._compute_bigquery_lineage_via_gcp_logging(
lineage_client_project_id
)
if not self.config.include_table_lineage:
return

if self.lineage_metadata is not None:
logger.info(
f"Built lineage map containing {len(self.lineage_metadata)} entries."
)
lineage_client_project_id = self._get_lineage_client_project_id()
if self.config.use_exported_bigquery_audit_metadata:
self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
lineage_client_project_id
)
else:
self._compute_bigquery_lineage_via_gcp_logging(lineage_client_project_id)

if self.lineage_metadata is None:
self.lineage_metadata = {}

self.report.lineage_metadata_entries = len(self.lineage_metadata)
logger.info(
f"Built lineage map containing {len(self.lineage_metadata)} entries."
)

def _compute_bigquery_lineage_via_gcp_logging(
self, lineage_client_project_id: Optional[str]
Expand Down Expand Up @@ -395,30 +341,39 @@ def _get_bigquery_log_entries(
clients: List[GCPLoggingClient],
template: str,
) -> Union[Iterable[AuditLogEntry], Iterable[BigQueryAuditMetadata]]:
self.report.num_total_log_entries = 0
# Add a buffer to start and end time to account for delays in logging events.
start_time = (self.config.start_time - self.config.max_query_duration).strftime(
BQ_DATETIME_FORMAT
)
self.report.log_entry_start_time = start_time

end_time = (self.config.end_time + self.config.max_query_duration).strftime(
BQ_DATETIME_FORMAT
)
self.report.log_entry_end_time = end_time

filter = template.format(
start_time=(
self.config.start_time - self.config.max_query_duration
).strftime(BQ_DATETIME_FORMAT),
end_time=(self.config.end_time + self.config.max_query_duration).strftime(
BQ_DATETIME_FORMAT
),
start_time=start_time,
end_time=end_time,
)

assert self.config.log_page_size is not None

logger.info("Start loading log entries from BigQuery")
logger.info(
f"Start loading log entries from BigQuery start_time={start_time} and end_time={end_time}"
)
for client in clients:
entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
item = 0
for entry in entries:
item = item + 1
if item % self.config.log_page_size == 0:
logger.info(f"Read {item} entry from log entries")
self.report.num_total_log_entries += 1
yield entry
logger.info(f"Finished loading {item} log entries from BigQuery")

logger.info(
f"Finished loading {self.report.num_total_log_entries} log entries from BigQuery so far"
)

def _get_exported_bigquery_audit_metadata(
self, bigquery_client: BigQueryClient
Expand All @@ -429,9 +384,12 @@ def _get_exported_bigquery_audit_metadata(
start_time: str = (
self.config.start_time - self.config.max_query_duration
).strftime(BQ_DATETIME_FORMAT)
self.report.audit_start_time = start_time

end_time: str = (
self.config.end_time + self.config.max_query_duration
).strftime(BQ_DATETIME_FORMAT)
self.report.audit_end_time = end_time

for dataset in self.config.bigquery_audit_metadata_datasets:
logger.info(
Expand Down Expand Up @@ -473,54 +431,59 @@ def _parse_bigquery_log_entries(
self,
entries: Union[Iterable[AuditLogEntry], Iterable[BigQueryAuditMetadata]],
) -> Iterable[QueryEvent]:
num_total_log_entries: int = 0
num_parsed_log_entires: int = 0
self.report.num_parsed_log_entires = 0
for entry in entries:
num_total_log_entries += 1
event: Optional[QueryEvent] = None
try:
if QueryEvent.can_parse_entry(entry):
event = QueryEvent.from_entry(entry)
num_parsed_log_entires += 1
elif QueryEvent.can_parse_entry_v2(entry):
event = QueryEvent.from_entry_v2(entry)
num_parsed_log_entires += 1
else:
raise RuntimeError("Unable to parse log entry as QueryEvent.")
except Exception as e:
self.report.report_failure(

missing_entry = QueryEvent.get_missing_key_entry(entry=entry)
if missing_entry is None:
event = QueryEvent.from_entry(entry)

missing_entry_v2 = QueryEvent.get_missing_key_entry_v2(entry=entry)
if event is None and missing_entry_v2 is None:
event = QueryEvent.from_entry_v2(entry)

if event is None:
self.error(
logger,
f"{entry.log_name}-{entry.insert_id}",
f"unable to parse log entry: {entry!r}",
f"Unable to parse log missing {missing_entry}, missing v2 {missing_entry_v2} for {entry}",
)
logger.error("Unable to parse GCP log entry.", e)
if event is not None:
else:
self.report.num_parsed_log_entires += 1
yield event

logger.info(
f"Parsing BigQuery log entries: Number of log entries scanned={num_total_log_entries}, "
f"number of log entries successfully parsed={num_parsed_log_entires}"
"Parsing BigQuery log entries: "
f"number of log entries successfully parsed={self.report.num_parsed_log_entires}"
)

def _parse_exported_bigquery_audit_metadata(
self, audit_metadata_rows: Iterable[BigQueryAuditMetadata]
) -> Iterable[QueryEvent]:
self.report.num_total_audit_entries = 0
self.report.num_parsed_audit_entires = 0
for audit_metadata in audit_metadata_rows:
self.report.num_total_audit_entries += 1
event: Optional[QueryEvent] = None
try:
if QueryEvent.can_parse_exported_bigquery_audit_metadata(

missing_exported_audit = (
QueryEvent.get_missing_key_exported_bigquery_audit_metadata(
audit_metadata
):
event = QueryEvent.from_exported_bigquery_audit_metadata(
audit_metadata
)
else:
raise RuntimeError("Unable to parse log entry as QueryEvent.")
except Exception as e:
self.report.report_failure(
f"""{audit_metadata["logName"]}-{audit_metadata["insertId"]}""",
f"unable to parse log entry: {audit_metadata!r}",
)
logger.error("Unable to parse GCP log entry.", e)
if event is not None:
)

if missing_exported_audit is None:
event = QueryEvent.from_exported_bigquery_audit_metadata(audit_metadata)

if event is None:
self.error(
logger,
f"{audit_metadata['logName']}-{audit_metadata['insertId']}",
f"Unable to parse audit metadata missing {missing_exported_audit} for {audit_metadata}",
)
else:
self.report.num_parsed_audit_entires += 1
yield event

def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[str]]:
Expand Down Expand Up @@ -672,13 +635,23 @@ def create(cls, config_dict, ctx):
config = BigQueryConfig.parse_obj(config_dict)
return cls(config, ctx)

def add_config_to_report(self):
self.report.start_time = self.config.start_time
self.report.end_time = self.config.end_time

self.report.use_exported_bigquery_audit_metadata = (
self.config.use_exported_bigquery_audit_metadata
)
self.report.use_v2_audit_metadata = self.config.use_v2_audit_metadata

# Overriding the get_workunits method to first compute the workunits using the base SQLAlchemySource
# and then computing lineage information only for those datasets that were ingested. This helps us to
# maintain a clear separation between SQLAlchemySource and the BigQuerySource. Also, this way we honor
# that flags like schema and table patterns for lineage computation as well.
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
# only compute the lineage if the object is none. This is is safety check in case if in future refactoring we
# end up computing lineage multiple times.
self.add_config_to_report()
if self.lineage_metadata is None:
self._compute_big_query_lineage()
with patch.dict(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,11 @@ def __init__(self, config: SQLAlchemyConfig, ctx: PipelineContext, platform: str
},
)

def warn(self, log: logging.Logger, key: str, reason: str) -> Any:
def warn(self, log: logging.Logger, key: str, reason: str) -> None:
self.report.report_warning(key, reason)
log.warning(reason)
log.warning(f"{key} => {reason}")

def error(self, log: logging.Logger, key: str, reason: str) -> Any:
def error(self, log: logging.Logger, key: str, reason: str) -> None:
self.report.report_failure(key, reason)
log.error(f"{key} => {reason}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ def __init__(
self._initialize_checkpointing_state_provider()
self.report: StatefulIngestionReport = StatefulIngestionReport()

def warn(self, log: logging.Logger, key: str, reason: str) -> Any:
def warn(self, log: logging.Logger, key: str, reason: str) -> None:
self.report.report_warning(key, reason)
log.warning(reason)
log.warning(f"{key} => {reason}")

def error(self, log: logging.Logger, key: str, reason: str) -> Any:
def error(self, log: logging.Logger, key: str, reason: str) -> None:
self.report.report_failure(key, reason)
log.error(f"{key} => {reason}")

Expand Down
Loading