Skip to content

Commit

Permalink
Fix BigQuery connection and add docs (#38430)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrey Anshin <Andrey.Anshin@taragol.is>
  • Loading branch information
shahar1 and Taragolis authored Apr 9, 2024
1 parent a92c47b commit 42c3eaf
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 17 deletions.
76 changes: 59 additions & 17 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import uuid
from copy import deepcopy
from datetime import datetime, timedelta
from functools import cached_property
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NoReturn, Sequence, Union, cast

from aiohttp import ClientSession as ClientSession
Expand Down Expand Up @@ -103,14 +104,49 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
conn_type = "gcpbigquery"
hook_name = "Google Bigquery"

@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Return connection widgets to add to connection form."""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import validators
from wtforms.fields.simple import BooleanField, StringField

from airflow.www.validators import ValidJson

connection_form_widgets = super().get_connection_form_widgets()
connection_form_widgets["use_legacy_sql"] = BooleanField(lazy_gettext("Use Legacy SQL"), default=True)
connection_form_widgets["location"] = StringField(
lazy_gettext("Location"), widget=BS3TextFieldWidget()
)
connection_form_widgets["priority"] = StringField(
lazy_gettext("Priority"),
default="INTERACTIVE",
widget=BS3TextFieldWidget(),
validators=[validators.AnyOf(["INTERACTIVE", "BATCH"])],
)
connection_form_widgets["api_resource_configs"] = StringField(
lazy_gettext("API Resource Configs"), widget=BS3TextFieldWidget(), validators=[ValidJson()]
)
connection_form_widgets["labels"] = StringField(
lazy_gettext("Labels"), widget=BS3TextFieldWidget(), validators=[ValidJson()]
)
connection_form_widgets["labels"] = StringField(
lazy_gettext("Labels"), widget=BS3TextFieldWidget(), validators=[ValidJson()]
)
return connection_form_widgets

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour."""
return super().get_ui_field_behaviour()

def __init__(
self,
gcp_conn_id: str = GoogleBaseHook.default_conn_name,
use_legacy_sql: bool = True,
location: str | None = None,
priority: str = "INTERACTIVE",
api_resource_configs: dict | None = None,
impersonation_chain: str | Sequence[str] | None = None,
impersonation_scopes: str | Sequence[str] | None = None,
labels: dict | None = None,
**kwargs,
Expand All @@ -120,18 +156,25 @@ def __init__(
"The `delegate_to` parameter has been deprecated before and finally removed in this version"
" of Google Provider. You MUST convert it to `impersonate_chain`"
)
super().__init__(
gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain,
)
self.use_legacy_sql = use_legacy_sql
self.location = location
self.priority = priority
super().__init__(**kwargs)
self.use_legacy_sql: bool = self._get_field("use_legacy_sql", use_legacy_sql)
self.location: str | None = self._get_field("location", location)
self.priority: str = self._get_field("priority", priority)
self.running_job_id: str | None = None
self.api_resource_configs: dict = api_resource_configs or {}
self.labels = labels
self.credentials_path = "bigquery_hook_credentials.json"
self.impersonation_scopes = impersonation_scopes
self.api_resource_configs: dict = self._get_field("api_resource_configs", api_resource_configs or {})
self.labels = self._get_field("labels", labels or {})
self.impersonation_scopes: str | Sequence[str] | None = impersonation_scopes

@cached_property
@deprecated(
reason=(
"`BigQueryHook.credentials_path` property is deprecated and will be removed in the future. "
"This property used for obtaining credentials path but no longer in actual use. "
),
category=AirflowProviderDeprecationWarning,
)
def credentials_path(self) -> str:
return "bigquery_hook_credentials.json"

def get_conn(self) -> BigQueryConnection:
"""Get a BigQuery PEP 249 connection object."""
Expand Down Expand Up @@ -172,18 +215,17 @@ def get_uri(self) -> str:
"""Override from ``DbApiHook`` for ``get_sqlalchemy_engine()``."""
return f"bigquery://{self.project_id}"

def get_sqlalchemy_engine(self, engine_kwargs=None):
def get_sqlalchemy_engine(self, engine_kwargs: dict | None = None):
"""Create an SQLAlchemy engine object.
:param engine_kwargs: Kwargs used in :func:`~sqlalchemy.create_engine`.
"""
if engine_kwargs is None:
engine_kwargs = {}
extras = self.get_connection(self.gcp_conn_id).extra_dejson
credentials_path = get_field(extras, "key_path")
credentials_path = get_field(self.extras, "key_path")
if credentials_path:
return create_engine(self.get_uri(), credentials_path=credentials_path, **engine_kwargs)
keyfile_dict = get_field(extras, "keyfile_dict")
keyfile_dict = get_field(self.extras, "keyfile_dict")
if keyfile_dict:
keyfile_content = keyfile_dict if isinstance(keyfile_dict, dict) else json.loads(keyfile_dict)
return create_engine(self.get_uri(), credentials_info=keyfile_content, **engine_kwargs)
Expand Down
62 changes: 62 additions & 0 deletions docs/apache-airflow-providers-google/connections/bigquery.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. _howto/connection:gcpbigquery:

Google Cloud BigQuery Connection
================================

The Google Cloud BigQuery connection type enables integration with the Google Cloud BigQuery.
As it is built on the top of Google Cloud Connection (i.e., BigQuery hook inherits from
GCP base hook), the basic authentication methods and parameters are exactly the same as the Google Cloud Connection.
Extra parameters that are specific to BigQuery will be covered in this document.


Configuring the Connection
--------------------------
.. note::
Please refer to :ref:`Google Cloud Connection docs<howto/connection:gcp:configuring_the_connection>`
for information regarding the basic authentication parameters.

Impersonation Scopes


Use Legacy SQL
Whether or not the connection should utilize legacy SQL.

Location
One of `BigQuery locations <https://cloud.google.com/bigquery/docs/locations>`_ where the dataset resides.
If None, it utilizes the default location configured in the BigQuery service.

Priority
Should be either "INTERACTIVE" or "BATCH",
see `running queries docs <https://cloud.google.com/bigquery/docs/running-queries>`_.
Interactive query jobs, which are jobs that BigQuery runs on demand.
Batch query jobs, which are jobs that BigQuery waits to run until idle compute resources are available.

API Resource Configs
A dictionary containing parameters for configuring the Google BigQuery Jobs API.
These configurations are applied according to the specifications outlined in the
`BigQuery Jobs API documentation <https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs>`_.
For example, you can specify configurations such as {'query': {'useQueryCache': False}}.
This parameter is useful when you need to provide additional parameters that are not directly supported by the
BigQueryHook.

Labels
A dictionary of labels to be applied on the BigQuery job.
2 changes: 2 additions & 0 deletions docs/apache-airflow-providers-google/connections/gcp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ For example:
export AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT='google-cloud-platform://'
.. _howto/connection:gcp:configuring_the_connection:

Configuring the Connection
--------------------------

Expand Down
5 changes: 5 additions & 0 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ def test_delegate_to_runtime_error():

@pytest.mark.db_test
class TestBigQueryHookMethods(_BigQueryBaseTestClass):
def test_credentials_path_derprecation(self):
with pytest.warns(AirflowProviderDeprecationWarning):
credentials_path = self.hook.credentials_path
assert credentials_path == "bigquery_hook_credentials.json"

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryConnection")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook._authorize")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.build")
Expand Down

0 comments on commit 42c3eaf

Please sign in to comment.