Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bigquery): support google-cloud-bigquery 3.15.0 #9595

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class SqlParsingBuilder:
def __post_init__(self) -> None:
if self.usage_config:
self._usage_aggregator = UsageAggregator(self.usage_config)
else:
elif self.generate_usage_statistics:
logger.info("No usage config provided, not generating usage statistics")
self.generate_usage_statistics = False

Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class CliReport(Report):

disk_info: Optional[dict] = None
peak_disk_usage: Optional[str] = None
_initial_disk_usage: int = -1
_peak_disk_usage: int = 0

thread_count: Optional[int] = None
Expand All @@ -156,12 +157,15 @@ def compute_stats(self) -> None:

try:
disk_usage = shutil.disk_usage("/")
if self._initial_disk_usage < 0:
self._initial_disk_usage = disk_usage.used
if self._peak_disk_usage < disk_usage.used:
self._peak_disk_usage = disk_usage.used
self.peak_disk_usage = humanfriendly.format_size(self._peak_disk_usage)
self.disk_info = {
"total": humanfriendly.format_size(disk_usage.total),
"used": humanfriendly.format_size(disk_usage.used),
"used_initally": humanfriendly.format_size(self._initial_disk_usage),
"free": humanfriendly.format_size(disk_usage.free),
}
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,9 +1283,13 @@ def create_bigquery_temp_table(
# temporary table dance. However, that would require either a) upgrading to
# use GE's batch v3 API or b) bypassing GE altogether.

query_job: Optional[
"google.cloud.bigquery.job.query.QueryJob"
] = cursor._query_job
query_job: Optional["google.cloud.bigquery.job.query.QueryJob"] = (
# In google-cloud-bigquery 3.15.0, the _query_job attribute was
# made public and renamed to query_job.
cursor.query_job
if hasattr(cursor, "query_job")
else cursor._query_job # type: ignore[attr-defined]
)
assert query_job
temp_destination_table = query_job.destination
bigquery_temp_table = f"{temp_destination_table.project}.{temp_destination_table.dataset_id}.{temp_destination_table.table_id}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,13 @@ def _populate_external_lineage_map(self, discovered_tables: List[str]) -> None:

self._populate_external_lineage_from_copy_history(discovered_tables)
logger.info(
"Done populating external lineage from copy history."
"Done populating external lineage from copy history. "
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
)

self._populate_external_lineage_from_show_query(discovered_tables)
logger.info(
"Done populating external lineage from show external tables."
"Done populating external lineage from show external tables. "
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
)

Expand Down
Loading