diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 1ce850c06de..761b32ceb26 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -37,6 +37,9 @@ 'location': { 'type': 'string', }, + 'priority': { + 'enum': ['interactive', 'batch'], + }, }, 'required': ['method', 'database', 'schema'], } @@ -192,6 +195,14 @@ def raw_execute(self, sql, fetch=False): job_config = google.cloud.bigquery.QueryJobConfig() job_config.use_legacy_sql = False + + priority = conn.credentials.get('priority', 'interactive') + if priority == "batch": + job_config.priority = google.cloud.bigquery.QueryPriority.BATCH + else: + job_config.priority = \ + google.cloud.bigquery.QueryPriority.INTERACTIVE + query_job = client.query(sql, job_config) # this blocks until the query has completed diff --git a/test/unit/test_bigquery_adapter.py b/test/unit/test_bigquery_adapter.py index 23ba7c78f83..0715c54fa81 100644 --- a/test/unit/test_bigquery_adapter.py +++ b/test/unit/test_bigquery_adapter.py @@ -47,6 +47,7 @@ def setUp(self): 'schema': 'dummy_schema', 'threads': 1, 'location': 'Luna Station', + 'priority': 'batch', }, }, 'target': 'oauth', @@ -104,6 +105,22 @@ def test_acquire_connection_service_account_validations(self, mock_open_connecti mock_open_connection.assert_called_once() + @patch('dbt.adapters.bigquery.BigQueryConnectionManager.open', return_value=_bq_conn()) + def test_acquire_connection_priority(self, mock_open_connection): + adapter = self.get_adapter('loc') + try: + connection = adapter.acquire_connection('dummy') + self.assertEqual(connection.get('type'), 'bigquery') + self.assertEqual(connection.credentials.get('priority'), 'batch') + + except dbt.exceptions.ValidationException as e: + self.fail('got ValidationException: {}'.format(str(e))) + + except BaseException as e: + raise + + mock_open_connection.assert_called_once() + def test_cancel_open_connections_empty(self): adapter = self.get_adapter('oauth') self.assertEqual(adapter.cancel_open_connections(), None)