Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigquery fix concurrent relation loads #835

Merged
merged 10 commits into from
Jul 13, 2018
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Changes

- Improve consistency of BigQuery list_relations, create shortcut for most materializations ([#835](https://github.com/fishtown-analytics/dbt/pull/835))
- Support external BigQuery relations ([#828](https://github.com/fishtown-analytics/dbt/pull/828))
- Added tox environments that have the user specify what tests should be run ([#837](https://github.com/fishtown-analytics/dbt/pull/837))
- Set the TCP keepalive on redshift ([#826](https://github.com/fishtown-analytics/dbt/pull/826))
Expand Down
86 changes: 68 additions & 18 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ class BigQueryAdapter(PostgresAdapter):
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/drive')

RELATION_TYPES = {
'TABLE': BigQueryRelation.Table,
'VIEW': BigQueryRelation.View,
'EXTERNAL': BigQueryRelation.External
}

QUERY_TIMEOUT = 300

@classmethod
Expand Down Expand Up @@ -182,24 +188,37 @@ def list_relations(cls, profile, project_cfg, schema, model_name=None):

bigquery_dataset = cls.get_dataset(
profile, project_cfg, schema, model_name)
all_tables = client.list_tables(bigquery_dataset)

relation_types = {
'TABLE': 'table',
'VIEW': 'view',
'EXTERNAL': 'external'
}

return [cls.Relation.create(
project=credentials.get('project'),
schema=schema,
identifier=table.table_id,
quote_policy={
'schema': True,
'identifier': True
},
type=relation_types.get(table.table_type))
for table in all_tables]

all_tables = client.list_tables(
bigquery_dataset,
# BigQuery paginates tables by alphabetizing them, and using
# the name of the last table on a page as the key for the
# next page. If that key table gets dropped before we run
# list_relations, then this will 404. So, we avoid this
# situation by making the page size sufficiently large.
# see: https://github.com/fishtown-analytics/dbt/issues/726
# TODO: cache the list of relations up front, and then we
# won't need to do this
max_results=100000)

return [cls.bq_table_to_relation(table) for table in all_tables]

@classmethod
def get_relation(cls, profile, project_cfg, schema=None, identifier=None,
relations_list=None, model_name=None):
if schema is None and relations_list is None:
raise dbt.exceptions.RuntimeException(
'get_relation needs either a schema to query, or a list '
'of relations to use')

if relations_list is None and identifier is not None:
table = cls.get_bq_table(profile, project_cfg, schema, identifier)

return cls.bq_table_to_relation(table)

return super(BigQueryAdapter, cls).get_relation(
profile, project_cfg, schema, identifier, relations_list,
model_name)

@classmethod
def drop_relation(cls, profile, project_cfg, relation, model_name=None):
Expand Down Expand Up @@ -468,6 +487,37 @@ def get_dataset(cls, profile, project_cfg, dataset_name, model_name=None):
dataset_ref = client.dataset(dataset_name)
return google.cloud.bigquery.Dataset(dataset_ref)

@classmethod
def bq_table_to_relation(cls, bq_table):
if bq_table is None:
return None

return cls.Relation.create(
project=bq_table.project,
schema=bq_table.dataset_id,
identifier=bq_table.table_id,
quote_policy={
'schema': True,
'identifier': True
},
type=cls.RELATION_TYPES.get(bq_table.table_type))

@classmethod
def get_bq_table(cls, profile, project_cfg, dataset_name, identifier,
model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(
profile, project_cfg, dataset_name, model_name)

table_ref = dataset.table(identifier)

try:
return client.get_table(table_ref)
except google.cloud.exceptions.NotFound:
return None

@classmethod
def warning_on_hooks(cls, hook_type):
msg = "{} is not supported in bigquery and will be ignored"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@

{%- set identifier = model['alias'] -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
{%- set existing_relations = adapter.list_relations(schema=schema) -%}
{%- set old_relation = adapter.get_relation(relations_list=existing_relations, identifier=identifier) -%}
{%- set old_relation = adapter.get_relation(schema=schema, identifier=identifier) -%}
{%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}
{%- set target_relation = api.Relation.create(schema=schema, identifier=identifier, type='table') -%}
{%- set verbose = config.get('verbose', False) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
{%- set identifier = model['alias'] -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}

{%- set existing_relations = adapter.list_relations(schema=schema) -%}

{%- set old_relation = adapter.get_relation(
relations_list=existing_relations,
schema=schema, identifier=identifier) -%}

{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}
Expand Down
4 changes: 2 additions & 2 deletions dbt/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def initialize_logger(debug_mode=False, path=None):

if debug_mode:
stdout_handler.setFormatter(
logging.Formatter('%(asctime)-18s: %(message)s'))
logging.Formatter('%(asctime)-18s (%(threadName)s): %(message)s'))
stdout_handler.setLevel(logging.DEBUG)

if path is not None:
Expand All @@ -91,7 +91,7 @@ def initialize_logger(debug_mode=False, path=None):
logdir_handler.addFilter(color_filter)

logdir_handler.setFormatter(
logging.Formatter('%(asctime)-18s: %(message)s'))
logging.Formatter('%(asctime)-18s (%(threadName)s): %(message)s'))
logdir_handler.setLevel(logging.DEBUG)

logger.addHandler(logdir_handler)
Expand Down