Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: configurable BigQuery offline store export parquet file size #13

Merged
merged 2 commits into from
Nov 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 11 additions & 20 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import contextlib
import os
import tempfile
import uuid
from datetime import date, datetime, timedelta
Expand All @@ -20,7 +19,7 @@
import pandas as pd
import pyarrow
import pyarrow.parquet
from pydantic import ConstrainedStr, StrictStr, validator
from pydantic import ConstrainedStr, Field, StrictStr, validator
from pydantic.typing import Literal
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

Expand Down Expand Up @@ -104,6 +103,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
gcs_staging_location: Optional[str] = None
""" (optional) GCS location used for offloading BigQuery results as parquet files."""

gcs_staging_file_size_mb: Optional[int] = Field(None, ge=1, le=1000)
""" (optional) Specify the staging file size in Megabytes. If it is not set, the BigQuery export function will determine the export file size automatically."""

table_create_disposition: Optional[BigQueryTableCreateDisposition] = None
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED."""

Expand Down Expand Up @@ -571,27 +573,16 @@ def to_remote_storage(self) -> List[str]:
"offline store when executing `to_remote_storage()`"
)

table = self.to_bigquery()
assert isinstance(self.config.offline_store, BigQueryOfflineStoreConfig)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this for type checking? Why wasn't this needed before?

Copy link
Author

@KarolisKont KarolisKont Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and getting autocompletion from IDE with field names. It is done the same way in another method.


parquet_file_size_mb = os.getenv("BQ_EXPORT_PARQUET_FILE_SIZE_MB")
if parquet_file_size_mb is not None:
if not parquet_file_size_mb.isdigit():
raise ValueError(
"The value for the BQ_EXPORT_PARQUET_FILE_SIZE_MB environment variable must "
"be a numeric digit, but it was set to: %s",
parquet_file_size_mb,
)

parquet_file_size_mb_int = int(parquet_file_size_mb)
if parquet_file_size_mb_int > 1000:
raise ValueError(
"The value for the BQ_EXPORT_PARQUET_FILE_SIZE_MB environment variable cannot "
"exceed 1000; however, it was set to: %s.",
parquet_file_size_mb_int,
)
table = self.to_bigquery()

if self.config.offline_store.gcs_staging_file_size_mb is not None:
table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024
number_of_files = max(1, table_size_in_mb // parquet_file_size_mb_int)
number_of_files = max(
1,
table_size_in_mb // self.config.offline_store.gcs_staging_file_size_mb,
)
destination_uris = [
f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files)
]
Expand Down
Loading