Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigquery #455

Merged
merged 53 commits into from
Jun 19, 2017
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
b2781b9
wip
drewbanin May 29, 2017
ba28422
Merge branch 'development' into bigquery
drewbanin May 30, 2017
f5b70ac
minimum viable bq adapter
drewbanin May 30, 2017
3e97021
Merge branch 'development' into bigquery
drewbanin Jun 1, 2017
dfdb6e4
proper bq quoting
drewbanin Jun 2, 2017
c4802db
absolute madness
drewbanin Jun 2, 2017
d82eba2
Merge branch 'adapter-agnostic' into bigquery
drewbanin Jun 2, 2017
17b330b
auto-install bq module
drewbanin Jun 2, 2017
5d3ca1e
catch runtime exception for nodes
drewbanin Jun 2, 2017
f3ce484
cleanup
drewbanin Jun 2, 2017
ba53a7a
more cleanup
drewbanin Jun 2, 2017
db99614
pip critical logging
drewbanin Jun 2, 2017
d449717
add adapter command
drewbanin Jun 3, 2017
e5008bd
Merge branch 'adapter-agnostic' into bigquery
drewbanin Jun 3, 2017
b52af47
major wip
drewbanin Jun 5, 2017
150e4eb
refactorin
drewbanin Jun 6, 2017
6b92e1b
closer
drewbanin Jun 6, 2017
1e0e989
handle model errors + dependent skips
drewbanin Jun 6, 2017
79a7759
cleanup + test transactions (for now)
drewbanin Jun 6, 2017
ddb1296
move model creation to materializers
drewbanin Jun 7, 2017
28f61b8
fix for ephemeral models
drewbanin Jun 7, 2017
173a1b7
override materializer for bq
drewbanin Jun 7, 2017
ac356c3
error handling
drewbanin Jun 7, 2017
3bd2e61
bq tests working
drewbanin Jun 7, 2017
1c4ed3c
commit tweaks for models
drewbanin Jun 7, 2017
baa542f
service accounts
drewbanin Jun 7, 2017
6f3a1a2
service account json for bq
drewbanin Jun 7, 2017
b2042ae
better error message if adapter is not installed
drewbanin Jun 8, 2017
a9cf033
fix unit tests
drewbanin Jun 8, 2017
a066798
pep8
drewbanin Jun 9, 2017
59b827f
fix integration tests
drewbanin Jun 9, 2017
75e2ba6
codeclimate
drewbanin Jun 9, 2017
a644e5d
fix typos
drewbanin Jun 10, 2017
bf10de9
fix circular dep for imports
drewbanin Jun 10, 2017
65a872a
catch programming exception for runners
drewbanin Jun 10, 2017
29c1f8b
code review changes
drewbanin Jun 12, 2017
48321d6
refactoring for code climate
drewbanin Jun 12, 2017
8049b17
selector cleanup
drewbanin Jun 12, 2017
b5d44e6
fix bug for erin
drewbanin Jun 13, 2017
fc02cfa
comment
drewbanin Jun 15, 2017
4f8d2bd
handle cancellation on ctrl-c for bq (log warning)
drewbanin Jun 15, 2017
9849fa0
better bq validation
drewbanin Jun 15, 2017
349c12e
test bq validation
drewbanin Jun 15, 2017
397dc9c
add uninstall flag to adapter task
drewbanin Jun 15, 2017
199cf0e
remove pip hacking nonsense
drewbanin Jun 15, 2017
612eef9
bq integration tests
drewbanin Jun 16, 2017
f5aa68a
remove initialize call for pg
drewbanin Jun 16, 2017
c222fc2
fix bq integration tests
drewbanin Jun 16, 2017
c2fc1e1
pep8
drewbanin Jun 16, 2017
3c943a9
remove -x opt from toxfile
drewbanin Jun 16, 2017
11c831a
handle notimplemented for bq better
drewbanin Jun 16, 2017
a2b7af6
missing import for seed task
drewbanin Jun 18, 2017
b3065ef
notimplemented for bq
drewbanin Jun 19, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 300 additions & 0 deletions dbt/adapters/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
from __future__ import absolute_import

from contextlib import contextmanager

import dbt.exceptions
import dbt.flags as flags
import dbt.materializers
import dbt.clients.gcloud

from dbt.adapters.postgres import PostgresAdapter
from dbt.contracts.connection import validate_connection
from dbt.logger import GLOBAL_LOGGER as logger


class BigQueryAdapter(PostgresAdapter):

QUERY_TIMEOUT = 60 * 1000
requires = {'bigquery': 'google-cloud-bigquery==0.24.0'}

@classmethod
def initialize(cls):
google = cls._import('google')
google.auth = cls._import('google.auth')
google.oauth2 = cls._import('google.oauth2')

google.cloud = cls._import('google.cloud')
google.cloud.bigquery = cls._import('google.cloud.bigquery')
google.cloud.exceptions = cls._import('google.cloud.exceptions')

globals()['google'] = google

@classmethod
def get_materializer(cls, node, existing):
materializer = dbt.materializers.NonDDLMaterializer
return dbt.materializers.make_materializer(materializer,
cls,
node,
existing)

@classmethod
def handle_error(cls, error, message, sql):
logger.debug(message.format(sql=sql))
logger.debug(error)
error_msg = "\n".join([error['message'] for error in error.errors])
raise dbt.exceptions.RuntimeException(error_msg)

@classmethod
@contextmanager
def exception_handler(cls, profile, sql, model_name=None,
connection_name='master'):
try:
yield

except google.cloud.exceptions.BadRequest as e:
message = "Bad request while running:\n{sql}"
cls.handle_error(e, message, sql)

except google.cloud.exceptions.Forbidden as e:
message = "Access denied while running:\n{sql}"
cls.handle_error(e, message, sql)

except Exception as e:
logger.debug("Unhandled error while running:\n{}".format(sql))
logger.debug(e)
raise dbt.exceptions.RuntimeException(e)

@classmethod
def type(cls):
return 'bigquery'

@classmethod
def date_function(cls):
return 'CURRENT_TIMESTAMP()'

@classmethod
def begin(cls, profile, name='master'):
pass

@classmethod
def commit(cls, connection):
pass

@classmethod
def get_status(cls, cursor):
raise Exception("Not implemented")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NotImplementedException // remove code below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch 👍

state = cursor.sqlstate

if state is None:
state = 'SUCCESS'

return "{} {}".format(state, cursor.rowcount)

@classmethod
def get_bigquery_credentials(cls, config):
method = config.get('method')
Creds = google.oauth2.service_account.Credentials
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't upcase variable names


if method == 'oauth':
return None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if they use the oauth method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We initialize a bigquery client with the credentials argument if it's a service account (from a private key file or the private key raw json).

If the credentials object is None (as with oauth), then the bq client lib will use the oauth flow to authenticate the user. We force this flow if the user isn't authenticated by shelling out to gcloud here

More info on gcp auth flows here


elif method == 'service-account':
keyfile = config.get('keyfile')
return Creds.from_service_account_file(keyfile)

elif method == 'service-account-json':
details = config.get('config')
return Creds.from_service_account_info(details)

error = ('Bad `method` in profile: "{}". '
'Should be "oauth" or "service-account"'.format(method))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or service-account-json?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can we put this validation into a voluptuous schema for the bigquery profile?

raise dbt.exceptions.FailedToConnectException(error)

@classmethod
def get_bigquery_client(cls, config):
project_name = config.get('project')
creds = cls.get_bigquery_credentials(config)

return google.cloud.bigquery.Client(project=project_name,
credentials=creds)

@classmethod
def open_connection(cls, connection):
if connection.get('state') == 'open':
logger.debug('Connection is already open, skipping open.')
return connection

result = connection.copy()
credentials = connection.get('credentials', {})

try:
handle = cls.get_bigquery_client(credentials)

except google.auth.exceptions.DefaultCredentialsError as e:
logger.info("Please log into GCP to continue")
dbt.clients.gcloud.setup_default_credentials()

handle = cls.get_bigquery_client(credentials)

except Exception as e:
logger.debug("Got an error when attempting to create a bigquery "
"client: '{}'".format(e))

result['handle'] = None
result['state'] = 'fail'

raise dbt.exceptions.FailedToConnectException(str(e))

result['handle'] = handle
result['state'] = 'open'
return result

@classmethod
def query_for_existing(cls, profile, schema, model_name=None):
dataset = cls.get_dataset(profile, schema, model_name)
tables = dataset.list_tables()

relation_type_lookup = {
'TABLE': 'table',
'VIEW': 'view'
}

existing = [(table.name, relation_type_lookup.get(table.table_type))
for table in tables]

return dict(existing)

@classmethod
def drop_view(cls, profile, view_name, model_name):
schema = cls.get_default_schema(profile)
dataset = cls.get_dataset(profile, schema, model_name)
view = dataset.table(view_name)
view.delete()

@classmethod
def rename(cls, profile, from_name, to_name, model_name=None):
message = 'Cannot rename bigquery relation {} to {}'.format(
from_name, to_name)
raise dbt.exceptions.NotImplementedException(message)

# hack because of current API limitations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate more? if there's a link explaining why we need this, you should add it here

Copy link
Contributor Author

@drewbanin drewbanin Jun 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it's actually not super well documented which is part of the problem. BQ defaults to the "Legacy SQL" syntax when views are created via the API. You can set a useLegacySql = False flag on the Table object, but it looks like it's presently ignored by the API.

Relevant issue

The best workaround i could find was to use the #standardSQL header line

@classmethod
def format_sql_for_bigquery(cls, sql):
return "#standardSQL\n{}".format(sql)

@classmethod
def execute_model(cls, profile, model):
connection = cls.get_connection(profile, model.get('name'))

if flags.STRICT_MODE:
validate_connection(connection)

model_name = model.get('name')
model_sql = cls.format_sql_for_bigquery(model.get('injected_sql'))

schema = cls.get_default_schema(profile)
dataset = cls.get_dataset(profile, schema, model_name)

view = dataset.table(model_name)
view.view_query = model_sql

logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql))

with cls.exception_handler(profile, model_sql, model_name, model_name):
view.create()

if view.created is None:
raise RuntimeError("Error creating view {}".format(model_name))

return "CREATE VIEW"

@classmethod
def fetch_query_results(cls, query):
all_rows = []

rows = query.rows
token = query.page_token

while True:
all_rows.extend(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(page_token=token)
return rows

@classmethod
def execute_and_fetch(cls, profile, sql, model_name=None, **kwargs):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

formatted_sql = cls.format_sql_for_bigquery(sql)
query = client.run_sync_query(formatted_sql)
query.timeout_ms = cls.QUERY_TIMEOUT

debug_message = "Fetching data for query {}:\n{}"
logger.debug(debug_message.format(model_name, formatted_sql))

query.run()

return cls.fetch_query_results(query)

@classmethod
def add_begin_query(cls, profile, name):
raise Exception("not implemented")
return cls.add_query(profile, 'BEGIN', name, auto_begin=False,
select_schema=False)

@classmethod
def create_schema(cls, profile, schema, model_name=None):
logger.debug('Creating schema "%s".', schema)

dataset = cls.get_dataset(profile, schema, model_name)

with cls.exception_handler(profile, 'create dataset', model_name):
dataset.create()

@classmethod
def check_schema_exists(cls, profile, schema, model_name=None):
conn = cls.get_connection(profile, model_name)

client = conn.get('handle')

with cls.exception_handler(profile, 'create dataset', model_name):
all_datasets = client.list_datasets()
return any([ds.name == schema for ds in all_datasets])

@classmethod
def get_dataset(cls, profile, dataset_name, model_name=None):
conn = cls.get_connection(profile, model_name)

client = conn.get('handle')
dataset = client.dataset(dataset_name)
return dataset

@classmethod
def add_query(cls, profile, sql, model_name=None, auto_begin=True):
raise Exception("Not implemented")

@classmethod
def cancel_connection(cls, profile, connection):
raise Exception("Not implemented")
handle = connection['handle']
sid = handle.session_id

connection_name = connection.get('name')

sql = 'select system$abort_session({})'.format(sid)

logger.debug("Cancelling query '{}' ({})".format(connection_name, sid))

_, cursor = cls.add_query(profile, sql, 'master')
res = cursor.fetchone()

logger.debug("Cancel query '{}': {}".format(connection_name, res))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the behavior on ctrl+c here?

if we need to just raise an exception, you should remove all this code.

also please change this and the exception for add_query to dbt.exceptions.NotImplementedException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmcarthur hadn't gotten there yet :)

Looks like we can actually issue a cancel command for running queries: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel

Copy link
Contributor Author

@drewbanin drewbanin Jun 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmcarthur it doesn't look like it's possible to cancel a table/view creation request. Jobs can be canceled though, which may be good to keep in mind for the future. This branch will log an error that looks like this on ctrl-c:

The bigquery adapter does not support query cancellation. Some queries may still be running!


@classmethod
def quote_schema_and_table(cls, profile, schema, table):
connection = cls.get_connection(profile)
credentials = connection.get('credentials', {})
project = credentials.get('project')
return '`{}`.`{}`.`{}`'.format(project, schema, table)
5 changes: 0 additions & 5 deletions dbt/adapters/cache.py

This file was deleted.

48 changes: 42 additions & 6 deletions dbt/adapters/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import re
import time
import yaml
import importlib

from contextlib import contextmanager

import dbt.exceptions
import dbt.flags
import dbt.materializers

from dbt.contracts.connection import validate_connection
from dbt.logger import GLOBAL_LOGGER as logger
Expand All @@ -21,6 +23,33 @@

class DefaultAdapter(object):

requires = {}

@classmethod
def is_installed(cls):
try:
cls.initialize()
return True
except ImportError:
return False

@classmethod
def _import(cls, name):
return importlib.import_module(name)

@classmethod
def install_requires(cls):
from pip import main as pip_main

for package, require in cls.requires.items():
logger.info("Installing {}".format(require))
pip_main(['install', require])
logger.info("Installed {} successfully!".format(require))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a unit test for this? i want to be sure it works on all platforms


@classmethod
def get_materializer(cls, model, existing):
return dbt.materializers.get_materializer(cls, model, existing)

###
# ADAPTER-SPECIFIC FUNCTIONS -- each of these must be overridden in
# every adapter
Expand Down Expand Up @@ -531,10 +560,17 @@ def add_query(cls, profile, sql, model_name=None, auto_begin=True):
return connection, cursor

@classmethod
def execute_one(cls, profile, sql, model_name=None):
def execute_one(cls, profile, sql, model_name=None, auto_begin=False):
cls.get_connection(profile, model_name)

return cls.add_query(profile, sql, model_name)
return cls.add_query(profile, sql, model_name, auto_begin)

@classmethod
def execute_and_fetch(cls, profile, sql, model_name=None,
auto_begin=False):
_, cursor = cls.execute_one(profile, sql, model_name, auto_begin)

return cursor.fetchall()

@classmethod
def execute_all(cls, profile, sqls, model_name=None):
Expand Down Expand Up @@ -567,13 +603,13 @@ def table_exists(cls, profile, schema, table, model_name=None):
exists = tables.get(table) is not None
return exists

@classmethod
def check_schema_exists(cls, profile, schema):
return cls.check_schema_exists(profile, schema)

@classmethod
def already_exists(cls, profile, schema, table, model_name=None):
"""
Alias for `table_exists`.
"""
return cls.table_exists(profile, schema, table, model_name)

@classmethod
def quote_schema_and_table(cls, profile, schema, table):
return '"{}"."{}"'.format(schema, table)
Loading