Skip to content

Commit

Permalink
Support for BigQuery quota project (#3182)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kayrnt authored Sep 25, 2024
1 parent 3b03f51 commit e2829a1
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 21 deletions.
41 changes: 21 additions & 20 deletions docs/integrations/engines/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,27 @@ pip install "sqlmesh[bigquery]"

### Connection options

| Option | Description | Type | Required |
|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------------|:------:|:--------:|
| `type` | Engine type name - must be `bigquery` | string | Y |
| `method` | Connection methods - see [allowed values below](#connection-methods). Default: `oauth`. | string | N |
| `project` | The name of the GCP project | string | N |
| `location` | The location of for the datasets (can be regional or multi-regional) | string | N |
| `execution_project` | The name of the GCP project to bill for the execution of the models. If not set, the project associated with the model will be used. | string | N |
| `keyfile` | Path to the keyfile to be used with service-account method | string | N |
| `keyfile_json` | Keyfile information provided inline (not recommended) | dict | N |
| `token` | OAuth 2.0 access token | string | N |
| `refresh_token` | OAuth 2.0 refresh token | string | N |
| `client_id` | OAuth 2.0 client ID | string | N |
| `client_secret` | OAuth 2.0 client secret | string | N |
| `token_uri` | OAuth 2.0 authorization server's toke endpoint URI | string | N |
| `scopes` | The scopes used to obtain authorization | list | N |
| `job_creation_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to be created. | int | N |
| `job_execution_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to complete. | int | N |
| `job_retries` | The number of times to retry the underlying job if it fails. (Default: `1`) | int | N |
| `priority` | The priority of the underlying job. (Default: `INTERACTIVE`) | string | N |
| `maximum_bytes_billed` | The maximum number of bytes to be billed for the underlying job. | int | N |
| Option | Description | Type | Required |
|---------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|:------:|:--------:|
| `type` | Engine type name - must be `bigquery` | string | Y |
| `method` | Connection methods - see [allowed values below](#connection-methods). Default: `oauth`. | string | N |
| `project` | The name of the GCP project | string | N |
| `location` | The location of for the datasets (can be regional or multi-regional) | string | N |
| `execution_project` | The name of the GCP project to bill for the execution of the models. If not set, the project associated with the model will be used. | string | N |
| `quota_project` | The name of the GCP project used for the quota. If not set, the `quota_project_id` set within the credentials of the account is used to authenticate to BigQuery. | string | N |
| `keyfile` | Path to the keyfile to be used with service-account method | string | N |
| `keyfile_json` | Keyfile information provided inline (not recommended) | dict | N |
| `token` | OAuth 2.0 access token | string | N |
| `refresh_token` | OAuth 2.0 refresh token | string | N |
| `client_id` | OAuth 2.0 client ID | string | N |
| `client_secret` | OAuth 2.0 client secret | string | N |
| `token_uri` | OAuth 2.0 authorization server's toke endpoint URI | string | N |
| `scopes` | The scopes used to obtain authorization | list | N |
| `job_creation_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to be created. | int | N |
| `job_execution_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to complete. | int | N |
| `job_retries` | The number of times to retry the underlying job if it fails. (Default: `1`) | int | N |
| `priority` | The priority of the underlying job. (Default: `INTERACTIVE`) | string | N |
| `maximum_bytes_billed` | The maximum number of bytes to be billed for the underlying job. | int | N |

## Airflow Scheduler
**Engine Name:** `bigquery`
Expand Down
18 changes: 17 additions & 1 deletion sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ class BigQueryConnectionConfig(ConnectionConfig):

project: t.Optional[str] = None
execution_project: t.Optional[str] = None
quota_project: t.Optional[str] = None
location: t.Optional[str] = None
# Keyfile Auth
keyfile: t.Optional[str] = None
Expand Down Expand Up @@ -766,6 +767,19 @@ def validate_execution_project(
)
return v

@field_validator("quota_project")
@field_validator_v1_args
def validate_quota_project(
cls,
v: t.Optional[str],
values: t.Dict[str, t.Any],
) -> t.Optional[str]:
if v and not values.get("project"):
raise ConfigError(
"If the `quota_project` field is specified, you must also specify the `project` field to provide a default object location."
)
return v

@property
def _connection_kwargs_keys(self) -> t.Set[str]:
return set()
Expand All @@ -778,7 +792,7 @@ def _engine_adapter(self) -> t.Type[EngineAdapter]:
def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
"""The static connection kwargs for this connection"""
import google.auth
from google.api_core import client_info
from google.api_core import client_info, client_options
from google.oauth2 import credentials, service_account

if self.method == BigQueryConnectionMethod.OAUTH:
Expand All @@ -802,11 +816,13 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
)
else:
raise ConfigError("Invalid BigQuery Connection Method")
options = client_options.ClientOptions(quota_project_id=self.quota_project)
client = google.cloud.bigquery.Client(
project=self.execution_project or self.project,
credentials=creds,
location=self.location,
client_info=client_info.ClientInfo(user_agent="sqlmesh"),
client_options=options,
)

return {
Expand Down
5 changes: 5 additions & 0 deletions tests/core/test_connection_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,17 +578,22 @@ def test_bigquery(make_config):
type="bigquery",
project="project",
execution_project="execution_project",
quota_project="quota_project",
)

assert isinstance(config, BigQueryConnectionConfig)
assert config.project == "project"
assert config.execution_project == "execution_project"
assert config.quota_project == "quota_project"
assert config.get_catalog() == "project"
assert config.is_recommended_for_state_sync is False

with pytest.raises(ConfigError, match="you must also specify the `project` field"):
make_config(type="bigquery", execution_project="execution_project")

with pytest.raises(ConfigError, match="you must also specify the `project` field"):
make_config(type="bigquery", quota_project="quota_project")


def test_postgres(make_config):
config = make_config(
Expand Down

0 comments on commit e2829a1

Please sign in to comment.