Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bigquery parity for statement blocks #526

Merged
merged 12 commits into from
Sep 19, 2017
24 changes: 19 additions & 5 deletions dbt/adapters/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class BigQueryAdapter(PostgresAdapter):
"query_for_existing",
"execute_model",
"drop",
"execute",
"quote_schema_and_table"
]

SCOPE = ('https://www.googleapis.com/auth/bigquery',
Expand Down Expand Up @@ -223,6 +225,8 @@ def materialize_as_table(cls, profile, dataset, model_name, model_sql):
job.write_disposition = 'WRITE_TRUNCATE'
job.begin()

cls.release_connection(profile, model_name)

logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql))

with cls.exception_handler(profile, model_sql, model_name, model_name):
Expand All @@ -232,10 +236,11 @@ def materialize_as_table(cls, profile, dataset, model_name, model_sql):

@classmethod
def execute_model(cls, profile, model, materialization, model_name=None):
connection = cls.get_connection(profile, model.get('name'))

if flags.STRICT_MODE:
connection = cls.get_connection(profile, model.get('name'))
validate_connection(connection)
cls.release_connection(profile, model.get('name'))

model_name = model.get('name')
model_schema = model.get('schema')
Expand Down Expand Up @@ -267,10 +272,10 @@ def fetch_query_results(cls, query):
if token is None:
break
rows, total_count, token = query.fetch_data(page_token=token)
return rows
return all_rows

@classmethod
def execute_and_fetch(cls, profile, sql, model_name=None, **kwargs):
def execute(cls, profile, sql, model_name=None, fetch=False, **kwargs):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand All @@ -283,7 +288,16 @@ def execute_and_fetch(cls, profile, sql, model_name=None, **kwargs):

query.run()

return cls.fetch_query_results(query)
res = []
if fetch:
res = cls.fetch_query_results(query)

status = 'ERROR' if query.errors else 'OK'
return status, res

@classmethod
def execute_and_fetch(cls, profile, sql, model_name, auto_begin=None):
return cls.execute(profile, sql, model_name, fetch=True)

@classmethod
def add_begin_query(cls, profile, name):
Expand Down Expand Up @@ -368,7 +382,7 @@ def quote(cls, identifier):
return '`{}`'.format(identifier)

@classmethod
def quote_schema_and_table(cls, profile, schema, table):
def quote_schema_and_table(cls, profile, schema, table, model_name=None):
connection = cls.get_connection(profile)
credentials = connection.get('credentials', {})
project = credentials.get('project')
Expand Down
15 changes: 14 additions & 1 deletion dbt/adapters/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class DefaultAdapter(object):
"add_query",
"expand_target_column_types",
"quote_schema_and_table",
"execute"
]

raw_functions = [
Expand Down Expand Up @@ -539,7 +540,19 @@ def execute_and_fetch(cls, profile, sql, model_name=None,
auto_begin=False):
_, cursor = cls.execute_one(profile, sql, model_name, auto_begin)

return cursor.fetchall()
status = cls.get_status(cursor)
rows = cursor.fetchall()
return status, rows

@classmethod
def execute(cls, profile, sql, model_name=None, auto_begin=False,
fetch=False):
if fetch:
return cls.execute_and_fetch(profile, sql, model_name, auto_begin)
else:
_, cursor = cls.execute_one(profile, sql, model_name, auto_begin)
status = cls.get_status(cursor)
return status, []

@classmethod
def execute_all(cls, profile, sqls, model_name=None):
Expand Down
7 changes: 5 additions & 2 deletions dbt/context/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ def _add_sql_handlers(context):
})


def log(msg):
logger.debug(msg)
def log(msg, info=False):
if info:
logger.info(msg)
else:
logger.debug(msg)
return ''


Expand Down
8 changes: 8 additions & 0 deletions dbt/include/global_project/macros/adapters/bigquery.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

{% macro bigquery__create_table_as(temporary, identifier, sql) -%}
{{ adapter.execute_model({"name": identifier, "injected_sql": sql, "schema": schema}, 'table') }}
{% endmacro %}

{% macro bigquery__create_view_as(identifier, sql) -%}
{{ adapter.execute_model({"name": identifier, "injected_sql": sql, "schema": schema}, 'view') }}
{% endmacro %}
1 change: 0 additions & 1 deletion dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
);
{% endmacro %}


{% macro create_view_as(identifier, sql) -%}
{{ adapter_macro('create_view_as', identifier, sql) }}
{%- endmacro %}
Expand Down
6 changes: 3 additions & 3 deletions dbt/include/global_project/macros/core.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
{{ write(sql) }}
{%- endif -%}

{%- set _, cursor = adapter.add_query(sql, auto_begin=auto_begin) -%}
{%- set status, res = adapter.execute(sql, auto_begin=auto_begin, fetch=fetch_result) -%}
{%- if name is not none -%}
{%- set result = [] if not fetch_result else adapter.get_result_from_cursor(cursor) -%}
{{ store_result(name, status=adapter.get_status(cursor), data=result) }}
{{ store_result(name, status=status, data=res) }}
{%- endif -%}

{%- endif -%}
{%- endmacro %}
4 changes: 2 additions & 2 deletions dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ def print_start_line(self):
self.num_nodes)

def execute_test(self, test):
rows = self.adapter.execute_and_fetch(
res, rows = self.adapter.execute_and_fetch(
self.profile,
test.get('wrapped_sql'),
test.get('name'),
Expand All @@ -426,7 +426,7 @@ def execute_test(self, test):
num_cols = len(rows[0])
raise RuntimeError(
"Bad test {name}: Returned {rows} rows and {cols} cols"
.format(name=test.name, rows=num_rows, cols=num_cols))
.format(name=test.get('name'), rows=num_rows, cols=num_cols))

return rows[0][0]

Expand Down
2 changes: 1 addition & 1 deletion dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def find_by_name(flat_graph, target_name, target_package, subgraph,
if len(node_parts) != 3:
node_type = model.get('resource_type', 'node')
msg = "{} names cannot contain '.' characters".format(node_type)
dbt.exceptions.raise_compiler_error(model, msg)
dbt.exceptions.raise_compiler_error(msg, model)

resource_type, package_name, node_name = node_parts

Expand Down
20 changes: 20 additions & 0 deletions test/integration/022_bigquery_test/macros/test_creation.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@


-- hack b/c bq model names are fully qualified, which doesn't work
-- with query_for_existing
{% macro test_was_materialized(model, name, type) %}

{#-- don't run this query in the parsing step #}
{%- if model -%}
{%- set existing_tables = adapter.query_for_existing(schema) -%}
{%- else -%}
{%- set existing_tables = {} -%}
{%- endif -%}

{% if name in existing_tables and existing_tables[name] == type %}
select 0 as success
{% else %}
select 1 as error
{% endif %}

{% endmacro %}
11 changes: 11 additions & 0 deletions test/integration/022_bigquery_test/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,14 @@ view:
unique:
- id
- dupe # fails

was_materialized:
- {name: view, type: view}

table_model:
constraints:
not_null:
- id

was_materialized:
- {name: table_model, type: table}
4 changes: 4 additions & 0 deletions test/integration/022_bigquery_test/models/table_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

{{ config(materialized = "table") }}

select * from {{ ref('view') }}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dbt.project import read_project


class TestSimpleBigQueryView(DBTIntegrationTest):
class TestSimpleBigQueryRun(DBTIntegrationTest):

def setUp(self):
pass
Expand All @@ -18,6 +18,12 @@ def schema(self):
def models(self):
return "test/integration/022_bigquery_test/models"

@property
def project_config(self):
return {
'macro-paths': ['test/integration/022_bigquery_test/macros'],
}

def run_schema_validations(self):
project = read_project('dbt_project.yml')
args = FakeArgs()
Expand Down