From 42c3eaf23a1cb76ae9f07b027b09948aabfcbf02 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Tue, 9 Apr 2024 14:59:58 +0300 Subject: [PATCH] Fix BigQuery connection and add docs (#38430) Co-authored-by: Andrey Anshin --- .../providers/google/cloud/hooks/bigquery.py | 76 ++++++++++++++----- .../connections/bigquery.rst | 62 +++++++++++++++ .../connections/gcp.rst | 2 + .../google/cloud/hooks/test_bigquery.py | 5 ++ 4 files changed, 128 insertions(+), 17 deletions(-) create mode 100644 docs/apache-airflow-providers-google/connections/bigquery.rst diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index ed1e284f970e7..a39025931d483 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -28,6 +28,7 @@ import uuid from copy import deepcopy from datetime import datetime, timedelta +from functools import cached_property from typing import TYPE_CHECKING, Any, Iterable, Mapping, NoReturn, Sequence, Union, cast from aiohttp import ClientSession as ClientSession @@ -103,14 +104,49 @@ class BigQueryHook(GoogleBaseHook, DbApiHook): conn_type = "gcpbigquery" hook_name = "Google Bigquery" + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Return connection widgets to add to connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import validators + from wtforms.fields.simple import BooleanField, StringField + + from airflow.www.validators import ValidJson + + connection_form_widgets = super().get_connection_form_widgets() + connection_form_widgets["use_legacy_sql"] = BooleanField(lazy_gettext("Use Legacy SQL"), default=True) + connection_form_widgets["location"] = StringField( + lazy_gettext("Location"), widget=BS3TextFieldWidget() + ) + connection_form_widgets["priority"] = StringField( + lazy_gettext("Priority"), + default="INTERACTIVE", + widget=BS3TextFieldWidget(), + validators=[validators.AnyOf(["INTERACTIVE", "BATCH"])], + ) + connection_form_widgets["api_resource_configs"] = StringField( + lazy_gettext("API Resource Configs"), widget=BS3TextFieldWidget(), validators=[ValidJson()] + ) + connection_form_widgets["labels"] = StringField( + lazy_gettext("Labels"), widget=BS3TextFieldWidget(), validators=[ValidJson()] + ) + connection_form_widgets["labels"] = StringField( + lazy_gettext("Labels"), widget=BS3TextFieldWidget(), validators=[ValidJson()] + ) + return connection_form_widgets + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Return custom field behaviour.""" + return super().get_ui_field_behaviour() + def __init__( self, - gcp_conn_id: str = GoogleBaseHook.default_conn_name, use_legacy_sql: bool = True, location: str | None = None, priority: str = "INTERACTIVE", api_resource_configs: dict | None = None, - impersonation_chain: str | Sequence[str] | None = None, impersonation_scopes: str | Sequence[str] | None = None, labels: dict | None = None, **kwargs, @@ -120,18 +156,25 @@ def __init__( "The `delegate_to` parameter has been deprecated before and finally removed in this version" " of Google Provider. You MUST convert it to `impersonate_chain`" ) - super().__init__( - gcp_conn_id=gcp_conn_id, - impersonation_chain=impersonation_chain, - ) - self.use_legacy_sql = use_legacy_sql - self.location = location - self.priority = priority + super().__init__(**kwargs) + self.use_legacy_sql: bool = self._get_field("use_legacy_sql", use_legacy_sql) + self.location: str | None = self._get_field("location", location) + self.priority: str = self._get_field("priority", priority) self.running_job_id: str | None = None - self.api_resource_configs: dict = api_resource_configs or {} - self.labels = labels - self.credentials_path = "bigquery_hook_credentials.json" - self.impersonation_scopes = impersonation_scopes + self.api_resource_configs: dict = self._get_field("api_resource_configs", api_resource_configs or {}) + self.labels = self._get_field("labels", labels or {}) + self.impersonation_scopes: str | Sequence[str] | None = impersonation_scopes + + @cached_property + @deprecated( + reason=( + "`BigQueryHook.credentials_path` property is deprecated and will be removed in the future. " + "This property used for obtaining credentials path but no longer in actual use. " + ), + category=AirflowProviderDeprecationWarning, + ) + def credentials_path(self) -> str: + return "bigquery_hook_credentials.json" def get_conn(self) -> BigQueryConnection: """Get a BigQuery PEP 249 connection object.""" @@ -172,18 +215,17 @@ def get_uri(self) -> str: """Override from ``DbApiHook`` for ``get_sqlalchemy_engine()``.""" return f"bigquery://{self.project_id}" - def get_sqlalchemy_engine(self, engine_kwargs=None): + def get_sqlalchemy_engine(self, engine_kwargs: dict | None = None): """Create an SQLAlchemy engine object. :param engine_kwargs: Kwargs used in :func:`~sqlalchemy.create_engine`. """ if engine_kwargs is None: engine_kwargs = {} - extras = self.get_connection(self.gcp_conn_id).extra_dejson - credentials_path = get_field(extras, "key_path") + credentials_path = get_field(self.extras, "key_path") if credentials_path: return create_engine(self.get_uri(), credentials_path=credentials_path, **engine_kwargs) - keyfile_dict = get_field(extras, "keyfile_dict") + keyfile_dict = get_field(self.extras, "keyfile_dict") if keyfile_dict: keyfile_content = keyfile_dict if isinstance(keyfile_dict, dict) else json.loads(keyfile_dict) return create_engine(self.get_uri(), credentials_info=keyfile_content, **engine_kwargs) diff --git a/docs/apache-airflow-providers-google/connections/bigquery.rst b/docs/apache-airflow-providers-google/connections/bigquery.rst new file mode 100644 index 0000000000000..27a66582f0ddb --- /dev/null +++ b/docs/apache-airflow-providers-google/connections/bigquery.rst @@ -0,0 +1,62 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/connection:gcpbigquery: + +Google Cloud BigQuery Connection +================================ + +The Google Cloud BigQuery connection type enables integration with the Google Cloud BigQuery. +As it is built on the top of Google Cloud Connection (i.e., BigQuery hook inherits from +GCP base hook), the basic authentication methods and parameters are exactly the same as the Google Cloud Connection. +Extra parameters that are specific to BigQuery will be covered in this document. + + +Configuring the Connection +-------------------------- +.. note:: + Please refer to :ref:`Google Cloud Connection docs` + for information regarding the basic authentication parameters. + +Impersonation Scopes + + +Use Legacy SQL + Whether or not the connection should utilize legacy SQL. + +Location + One of `BigQuery locations `_ where the dataset resides. + If None, it utilizes the default location configured in the BigQuery service. + +Priority + Should be either "INTERACTIVE" or "BATCH", + see `running queries docs `_. + Interactive query jobs, which are jobs that BigQuery runs on demand. + Batch query jobs, which are jobs that BigQuery waits to run until idle compute resources are available. + +API Resource Configs + A dictionary containing parameters for configuring the Google BigQuery Jobs API. + These configurations are applied according to the specifications outlined in the + `BigQuery Jobs API documentation `_. + For example, you can specify configurations such as {'query': {'useQueryCache': False}}. + This parameter is useful when you need to provide additional parameters that are not directly supported by the + BigQueryHook. + +Labels + A dictionary of labels to be applied on the BigQuery job. diff --git a/docs/apache-airflow-providers-google/connections/gcp.rst b/docs/apache-airflow-providers-google/connections/gcp.rst index 9ebe21efe33b8..5418531957431 100644 --- a/docs/apache-airflow-providers-google/connections/gcp.rst +++ b/docs/apache-airflow-providers-google/connections/gcp.rst @@ -82,6 +82,8 @@ For example: export AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT='google-cloud-platform://' +.. _howto/connection:gcp:configuring_the_connection: + Configuring the Connection -------------------------- diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py b/tests/providers/google/cloud/hooks/test_bigquery.py index bfb66f5e1fce3..b02222b350d80 100644 --- a/tests/providers/google/cloud/hooks/test_bigquery.py +++ b/tests/providers/google/cloud/hooks/test_bigquery.py @@ -77,6 +77,11 @@ def test_delegate_to_runtime_error(): @pytest.mark.db_test class TestBigQueryHookMethods(_BigQueryBaseTestClass): + def test_credentials_path_derprecation(self): + with pytest.warns(AirflowProviderDeprecationWarning): + credentials_path = self.hook.credentials_path + assert credentials_path == "bigquery_hook_credentials.json" + @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryConnection") @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook._authorize") @mock.patch("airflow.providers.google.cloud.hooks.bigquery.build")