Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigquery #455

Merged
merged 53 commits into from
Jun 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
b2781b9
wip
drewbanin May 29, 2017
ba28422
Merge branch 'development' into bigquery
drewbanin May 30, 2017
f5b70ac
minimum viable bq adapter
drewbanin May 30, 2017
3e97021
Merge branch 'development' into bigquery
drewbanin Jun 1, 2017
dfdb6e4
proper bq quoting
drewbanin Jun 2, 2017
c4802db
absolute madness
drewbanin Jun 2, 2017
d82eba2
Merge branch 'adapter-agnostic' into bigquery
drewbanin Jun 2, 2017
17b330b
auto-install bq module
drewbanin Jun 2, 2017
5d3ca1e
catch runtime exception for nodes
drewbanin Jun 2, 2017
f3ce484
cleanup
drewbanin Jun 2, 2017
ba53a7a
more cleanup
drewbanin Jun 2, 2017
db99614
pip critical logging
drewbanin Jun 2, 2017
d449717
add adapter command
drewbanin Jun 3, 2017
e5008bd
Merge branch 'adapter-agnostic' into bigquery
drewbanin Jun 3, 2017
b52af47
major wip
drewbanin Jun 5, 2017
150e4eb
refactorin
drewbanin Jun 6, 2017
6b92e1b
closer
drewbanin Jun 6, 2017
1e0e989
handle model errors + dependent skips
drewbanin Jun 6, 2017
79a7759
cleanup + test transactions (for now)
drewbanin Jun 6, 2017
ddb1296
move model creation to materializers
drewbanin Jun 7, 2017
28f61b8
fix for ephemeral models
drewbanin Jun 7, 2017
173a1b7
override materializer for bq
drewbanin Jun 7, 2017
ac356c3
error handling
drewbanin Jun 7, 2017
3bd2e61
bq tests working
drewbanin Jun 7, 2017
1c4ed3c
commit tweaks for models
drewbanin Jun 7, 2017
baa542f
service accounts
drewbanin Jun 7, 2017
6f3a1a2
service account json for bq
drewbanin Jun 7, 2017
b2042ae
better error message if adapter is not installed
drewbanin Jun 8, 2017
a9cf033
fix unit tests
drewbanin Jun 8, 2017
a066798
pep8
drewbanin Jun 9, 2017
59b827f
fix integration tests
drewbanin Jun 9, 2017
75e2ba6
codeclimate
drewbanin Jun 9, 2017
a644e5d
fix typos
drewbanin Jun 10, 2017
bf10de9
fix circular dep for imports
drewbanin Jun 10, 2017
65a872a
catch programming exception for runners
drewbanin Jun 10, 2017
29c1f8b
code review changes
drewbanin Jun 12, 2017
48321d6
refactoring for code climate
drewbanin Jun 12, 2017
8049b17
selector cleanup
drewbanin Jun 12, 2017
b5d44e6
fix bug for erin
drewbanin Jun 13, 2017
fc02cfa
comment
drewbanin Jun 15, 2017
4f8d2bd
handle cancellation on ctrl-c for bq (log warning)
drewbanin Jun 15, 2017
9849fa0
better bq validation
drewbanin Jun 15, 2017
349c12e
test bq validation
drewbanin Jun 15, 2017
397dc9c
add uninstall flag to adapter task
drewbanin Jun 15, 2017
199cf0e
remove pip hacking nonsense
drewbanin Jun 15, 2017
612eef9
bq integration tests
drewbanin Jun 16, 2017
f5aa68a
remove initialize call for pg
drewbanin Jun 16, 2017
c222fc2
fix bq integration tests
drewbanin Jun 16, 2017
c2fc1e1
pep8
drewbanin Jun 16, 2017
3c943a9
remove -x opt from toxfile
drewbanin Jun 16, 2017
11c831a
handle notimplemented for bq better
drewbanin Jun 16, 2017
a2b7af6
missing import for seed task
drewbanin Jun 18, 2017
b3065ef
notimplemented for bq
drewbanin Jun 19, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ test-unit:

test-integration:
@echo "Integration test run starting..."
@time docker-compose run test tox -e integration-postgres-py27,integration-postgres-py35,integration-snowflake-py27,integration-snowflake-py35
@time docker-compose run test tox -e integration-postgres-py27,integration-postgres-py35,integration-snowflake-py27,integration-snowflake-py35,integration-bigquery-py27,integration-bigquery-py35

test-new:
@echo "Test run starting..."
Expand Down
292 changes: 292 additions & 0 deletions dbt/adapters/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
from __future__ import absolute_import

from contextlib import contextmanager

import dbt.exceptions
import dbt.flags as flags
import dbt.materializers
import dbt.clients.gcloud

from dbt.adapters.postgres import PostgresAdapter
from dbt.contracts.connection import validate_connection
from dbt.logger import GLOBAL_LOGGER as logger

import google.auth
import google.oauth2
import google.cloud.exceptions
import google.cloud.bigquery


class BigQueryAdapter(PostgresAdapter):

QUERY_TIMEOUT = 60 * 1000

@classmethod
def get_materializer(cls, node, existing):
materializer = dbt.materializers.NonDDLMaterializer
return dbt.materializers.make_materializer(materializer,
cls,
node,
existing)

@classmethod
def handle_error(cls, error, message, sql):
logger.debug(message.format(sql=sql))
logger.debug(error)
error_msg = "\n".join([error['message'] for error in error.errors])
raise dbt.exceptions.RuntimeException(error_msg)

@classmethod
@contextmanager
def exception_handler(cls, profile, sql, model_name=None,
connection_name='master'):
try:
yield

except google.cloud.exceptions.BadRequest as e:
message = "Bad request while running:\n{sql}"
cls.handle_error(e, message, sql)

except google.cloud.exceptions.Forbidden as e:
message = "Access denied while running:\n{sql}"
cls.handle_error(e, message, sql)

except Exception as e:
logger.debug("Unhandled error while running:\n{}".format(sql))
logger.debug(e)
raise dbt.exceptions.RuntimeException(e)

@classmethod
def type(cls):
return 'bigquery'

@classmethod
def date_function(cls):
return 'CURRENT_TIMESTAMP()'

@classmethod
def begin(cls, profile, name='master'):
pass

@classmethod
def commit(cls, connection):
pass

@classmethod
def get_status(cls, cursor):
raise dbt.exceptions.NotImplementedException(
'`get_status` is not implemented for this adapter!')

@classmethod
def get_bigquery_credentials(cls, config):
method = config.get('method')
creds = google.oauth2.service_account.Credentials

if method == 'oauth':
return google.auth.default()

elif method == 'service-account':
keyfile = config.get('keyfile')
return creds.from_service_account_file(keyfile)

elif method == 'service-account-json':
details = config.get('keyfile_json')
return creds.from_service_account_info(details)

error = ('Invalid `method` in profile: "{}"'.format(method))
raise dbt.exceptions.FailedToConnectException(error)

@classmethod
def get_bigquery_client(cls, config):
project_name = config.get('project')
creds = cls.get_bigquery_credentials(config)

return google.cloud.bigquery.Client(project_name, creds)

@classmethod
def open_connection(cls, connection):
if connection.get('state') == 'open':
logger.debug('Connection is already open, skipping open.')
return connection

result = connection.copy()
credentials = connection.get('credentials', {})

try:
handle = cls.get_bigquery_client(credentials)

except google.auth.exceptions.DefaultCredentialsError as e:
logger.info("Please log into GCP to continue")
dbt.clients.gcloud.setup_default_credentials()

handle = cls.get_bigquery_client(credentials)

except Exception as e:
raise
logger.debug("Got an error when attempting to create a bigquery "
"client: '{}'".format(e))

result['handle'] = None
result['state'] = 'fail'

raise dbt.exceptions.FailedToConnectException(str(e))

result['handle'] = handle
result['state'] = 'open'
return result

@classmethod
def query_for_existing(cls, profile, schema, model_name=None):
dataset = cls.get_dataset(profile, schema, model_name)
tables = dataset.list_tables()

relation_type_lookup = {
'TABLE': 'table',
'VIEW': 'view'
}

existing = [(table.name, relation_type_lookup.get(table.table_type))
for table in tables]

return dict(existing)

@classmethod
def drop_view(cls, profile, view_name, model_name):
schema = cls.get_default_schema(profile)
dataset = cls.get_dataset(profile, schema, model_name)
view = dataset.table(view_name)
view.delete()

@classmethod
def rename(cls, profile, from_name, to_name, model_name=None):
raise dbt.exceptions.NotImplementedException(
'`rename` is not implemented for this adapter!')

# Hack because of current API limitations. We should set a flag on the
# Table object indicating StandardSQL when it's implemented
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3388
@classmethod
def format_sql_for_bigquery(cls, sql):
return "#standardSQL\n{}".format(sql)

@classmethod
def execute_model(cls, profile, model):
connection = cls.get_connection(profile, model.get('name'))

if flags.STRICT_MODE:
validate_connection(connection)

model_name = model.get('name')
model_sql = cls.format_sql_for_bigquery(model.get('injected_sql'))

schema = cls.get_default_schema(profile)
dataset = cls.get_dataset(profile, schema, model_name)

view = dataset.table(model_name)
view.view_query = model_sql

logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql))

with cls.exception_handler(profile, model_sql, model_name, model_name):
view.create()

if view.created is None:
raise RuntimeError("Error creating view {}".format(model_name))

return "CREATE VIEW"

@classmethod
def fetch_query_results(cls, query):
all_rows = []

rows = query.rows
token = query.page_token

while True:
all_rows.extend(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(page_token=token)
return rows

@classmethod
def execute_and_fetch(cls, profile, sql, model_name=None, **kwargs):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

formatted_sql = cls.format_sql_for_bigquery(sql)
query = client.run_sync_query(formatted_sql)
query.timeout_ms = cls.QUERY_TIMEOUT

debug_message = "Fetching data for query {}:\n{}"
logger.debug(debug_message.format(model_name, formatted_sql))

query.run()

return cls.fetch_query_results(query)

@classmethod
def add_begin_query(cls, profile, name):
raise dbt.exceptions.NotImplementedException(
'`add_begin_query` is not implemented for this adapter!')

@classmethod
def create_schema(cls, profile, schema, model_name=None):
logger.debug('Creating schema "%s".', schema)

dataset = cls.get_dataset(profile, schema, model_name)

with cls.exception_handler(profile, 'create dataset', model_name):
dataset.create()

@classmethod
def drop_tables_in_schema(cls, dataset):
for table in dataset.list_tables():
table.delete()

@classmethod
def drop_schema(cls, profile, schema, model_name=None):
logger.debug('Dropping schema "%s".', schema)

if not cls.check_schema_exists(profile, schema, model_name):
return

dataset = cls.get_dataset(profile, schema, model_name)

with cls.exception_handler(profile, 'drop dataset', model_name):
cls.drop_tables_in_schema(dataset)
dataset.delete()

@classmethod
def check_schema_exists(cls, profile, schema, model_name=None):
conn = cls.get_connection(profile, model_name)

client = conn.get('handle')

with cls.exception_handler(profile, 'create dataset', model_name):
all_datasets = client.list_datasets()
return any([ds.name == schema for ds in all_datasets])

@classmethod
def get_dataset(cls, profile, dataset_name, model_name=None):
conn = cls.get_connection(profile, model_name)

client = conn.get('handle')
dataset = client.dataset(dataset_name)
return dataset

@classmethod
def add_query(cls, profile, sql, model_name=None, auto_begin=True):
raise dbt.exceptions.NotImplementedException(
'`add_query` is not implemented for this adapter!')

@classmethod
def is_cancelable(cls):
return False

@classmethod
def quote_schema_and_table(cls, profile, schema, table):
connection = cls.get_connection(profile)
credentials = connection.get('credentials', {})
project = credentials.get('project')
return '`{}`.`{}`.`{}`'.format(project, schema, table)
5 changes: 0 additions & 5 deletions dbt/adapters/cache.py

This file was deleted.

Loading