Skip to content

Commit

Permalink
fix(ingest/bigquery): support google-cloud-bigquery 3.15.0 (#9595)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jan 10, 2024
1 parent b0060ce commit 39e88ef
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class SqlParsingBuilder:
def __post_init__(self) -> None:
if self.usage_config:
self._usage_aggregator = UsageAggregator(self.usage_config)
else:
elif self.generate_usage_statistics:
logger.info("No usage config provided, not generating usage statistics")
self.generate_usage_statistics = False

Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class CliReport(Report):

disk_info: Optional[dict] = None
peak_disk_usage: Optional[str] = None
_initial_disk_usage: int = -1
_peak_disk_usage: int = 0

thread_count: Optional[int] = None
Expand All @@ -156,12 +157,15 @@ def compute_stats(self) -> None:

try:
disk_usage = shutil.disk_usage("/")
if self._initial_disk_usage < 0:
self._initial_disk_usage = disk_usage.used
if self._peak_disk_usage < disk_usage.used:
self._peak_disk_usage = disk_usage.used
self.peak_disk_usage = humanfriendly.format_size(self._peak_disk_usage)
self.disk_info = {
"total": humanfriendly.format_size(disk_usage.total),
"used": humanfriendly.format_size(disk_usage.used),
"used_initally": humanfriendly.format_size(self._initial_disk_usage),
"free": humanfriendly.format_size(disk_usage.free),
}
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,9 +1283,13 @@ def create_bigquery_temp_table(
# temporary table dance. However, that would require either a) upgrading to
# use GE's batch v3 API or b) bypassing GE altogether.

query_job: Optional[
"google.cloud.bigquery.job.query.QueryJob"
] = cursor._query_job
query_job: Optional["google.cloud.bigquery.job.query.QueryJob"] = (
# In google-cloud-bigquery 3.15.0, the _query_job attribute was
# made public and renamed to query_job.
cursor.query_job
if hasattr(cursor, "query_job")
else cursor._query_job # type: ignore[attr-defined]
)
assert query_job
temp_destination_table = query_job.destination
bigquery_temp_table = f"{temp_destination_table.project}.{temp_destination_table.dataset_id}.{temp_destination_table.table_id}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,13 @@ def _populate_external_lineage_map(self, discovered_tables: List[str]) -> None:

self._populate_external_lineage_from_copy_history(discovered_tables)
logger.info(
"Done populating external lineage from copy history."
"Done populating external lineage from copy history. "
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
)

self._populate_external_lineage_from_show_query(discovered_tables)
logger.info(
"Done populating external lineage from show external tables."
"Done populating external lineage from show external tables. "
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
)

Expand Down

0 comments on commit 39e88ef

Please sign in to comment.