Skip to content

Commit

Permalink
Remove temporary tables after data export, add logging (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjurkus authored Nov 9, 2023
1 parent 8452624 commit f80913d
Showing 1 changed file with 51 additions and 38 deletions.
89 changes: 51 additions & 38 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import logging
import tempfile
import uuid
from datetime import date, datetime, timedelta
Expand Down Expand Up @@ -46,7 +47,6 @@
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import get_user_agent, log_exceptions_and_usage

from .bigquery_source import (
BigQueryLoggingDestination,
BigQuerySource,
Expand All @@ -67,6 +67,8 @@

raise FeastExtrasDependencyImportError("gcp", str(e))

logger = logging.getLogger(__name__)


def get_http_client_info():
return http_client_info.ClientInfo(user_agent=get_user_agent())
Expand Down Expand Up @@ -575,46 +577,57 @@ def to_remote_storage(self) -> List[str]:

assert isinstance(self.config.offline_store, BigQueryOfflineStoreConfig)

table = self.to_bigquery()

if self.config.offline_store.gcs_staging_file_size_mb is not None:
table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024
number_of_files = max(
1,
int(table_size_in_mb // self.config.offline_store.gcs_staging_file_size_mb),
)
destination_uris = [
f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files)
]
else:
destination_uris = [f"{self._gcs_path}/*.parquet"]
table = None
try:
logger.info(f"Starting data export to '{self._gcs_path}'")
table = self.to_bigquery()
logger.info(f"Data exported to table '{table}'")

if self.config.offline_store.gcs_staging_file_size_mb is not None:
table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024
number_of_files = max(
1,
int(table_size_in_mb // self.config.offline_store.gcs_staging_file_size_mb),
)
destination_uris = [
f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files)
]
else:
destination_uris = [f"{self._gcs_path}/*.parquet"]

job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = "PARQUET"
job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = "PARQUET"

extract_job = self.client.extract_table(
table,
destination_uris=destination_uris,
location=self.config.offline_store.location,
job_config=job_config,
)
extract_job.result()
logger.info(f"Starting data extraction from '{table}' to '{self._gcs_path}'")
extract_job = self.client.extract_table(
table,
destination_uris=destination_uris,
location=self.config.offline_store.location,
job_config=job_config,
)
extract_job.result()

bucket: str
prefix: str
if self.config.offline_store.billing_project_id:
storage_client = StorageClient(project=self.config.offline_store.project_id)
else:
storage_client = StorageClient(project=self.client.project)
bucket, prefix = self._gcs_path[len("gs://") :].split("/", 1)
if prefix.startswith("/"):
prefix = prefix[1:]

blobs = storage_client.list_blobs(bucket, prefix=prefix)
results = []
for b in blobs:
results.append(f"gs://{b.bucket.name}/{b.name}")
return results
bucket: str
prefix: str
if self.config.offline_store.billing_project_id:
storage_client = StorageClient(project=self.config.offline_store.project_id)
else:
storage_client = StorageClient(project=self.client.project)
bucket, prefix = self._gcs_path[len("gs://"):].split("/", 1)
if prefix.startswith("/"):
prefix = prefix[1:]

blobs = storage_client.list_blobs(bucket, prefix=prefix)
results = []
for b in blobs:
results.append(f"gs://{b.bucket.name}/{b.name}")

logger.info(f"Data extraction completed. Extracted to {len(results)} files")
return results
finally:
if table:
logger.info(f"Cleanup: Deleting temporary table '{table}'")
self.client.delete_table(table=table, not_found_ok=True)


def block_until_done(
Expand Down

0 comments on commit f80913d

Please sign in to comment.