Skip to content

Commit

Permalink
Merge branch 'dev/0.14.1' into dev/louisa-may-alcott
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Aug 21, 2019
2 parents 01534c1 + f73d561 commit eb8bce4
Show file tree
Hide file tree
Showing 23 changed files with 285 additions and 47 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ This is primarily a bugfix release which contains a few minor improvements too.
- Add environment variables for macro debugging flags ([#1628](https://github.com/fishtown-analytics/dbt/issues/1628), [#1629](https://github.com/fishtown-analytics/dbt/pull/1629))
- Speed up node selection by making it linear, rather than quadratic, in complexity ([#1611](https://github.com/fishtown-analytics/dbt/issues/1611), [#1615](https://github.com/fishtown-analytics/dbt/pull/1615))
- Specify the `application` field in Snowflake connections ([#1622](https://github.com/fishtown-analytics/dbt/issues/1622), [#1623](https://github.com/fishtown-analytics/dbt/pull/1623))
- Add support for clustering on Snowflake ([#634](https://github.com/fishtown-analytics/dbt/issues/634), [#1591](https://github.com/fishtown-analytics/dbt/pull/1591), [#1689](https://github.com/fishtown-analytics/dbt/pull/1689))
- Add support for job priority on BigQuery ([#1456](https://github.com/fishtown-analytics/dbt/issues/1456), [#1673](https://github.com/fishtown-analytics/dbt/pull/1673))

### Fixes:
- Fix for reused `check_cols` values in snapshots ([#1614](https://github.com/fishtown-analytics/dbt/pull/1614))
Expand All @@ -27,13 +29,28 @@ This is primarily a bugfix release which contains a few minor improvements too.
- Fix for creation of user cookies in incorrect directories when `--profile-dir` or `$DBT_PROFILES_DIR` is provided ([#1645](https://github.com/fishtown-analytics/dbt/issues/1645), [#1656](https://github.com/fishtown-analytics/dbt/pull/1656))
- Fix for error handling when transactions are being rolled back ([#1647](https://github.com/fishtown-analytics/dbt/pull/1647))
- Fix for incorrect references to `dbt.exceptions` in jinja code ([#1569](https://github.com/fishtown-analytics/dbt/issues/1569), [#1609](https://github.com/fishtown-analytics/dbt/pull/1609))
- Fix for duplicated schema creation due to case-sensitive comparison ([#1651](https://github.com/fishtown-analytics/dbt/issues/1651), [#1663](https://github.com/fishtown-analytics/dbt/pull/1663))
- Fix for "schema stub" created automatically by dbt ([#913](https://github.com/fishtown-analytics/dbt/issues/913), [#1663](https://github.com/fishtown-analytics/dbt/pull/1663))
- Fix for incremental merge query on old versions of postgres (<=9.6) ([#1665](https://github.com/fishtown-analytics/dbt/issues/1665), [#1666](https://github.com/fishtown-analytics/dbt/pull/1666))
- Fix for serializing results of queries which return `TIMESTAMP_TZ` columns on Snowflake in the RPC server ([#1670](https://github.com/fishtown-analytics/dbt/pull/1670))
- Fix typo in InternalException ([#1640](https://github.com/fishtown-analytics/dbt/issues/1640), [#1672](https://github.com/fishtown-analytics/dbt/pull/1672))
- Fix typo in CLI help for snapshot migration subcommand ([#1664](https://github.com/fishtown-analytics/dbt/pull/1664))
- Fix for error handling logic when empty queries are submitted on Snowflake ([#1693](https://github.com/fishtown-analytics/dbt/issues/1693), [#1694](https://github.com/fishtown-analytics/dbt/pull/1694))
- Fix for non-atomic column expansion logic in Snowflake incremental models and snapshots ([#1687](https://github.com/fishtown-analytics/dbt/issues/1687), [#1690](https://github.com/fishtown-analytics/dbt/pull/1690))
- Fix for unprojected `count(*)` expression injected by custom data tests ([#1688](https://github.com/fishtown-analytics/dbt/pull/1688))
- Fix for `dbt run` and `dbt docs generate` commands when running against Panoply Redshift ([#1479](https://github.com/fishtown-analytics/dbt/issues/1479), [#1686](https://github.com/fishtown-analytics/dbt/pull/1686))


### Contributors:
Thanks for your contributions to dbt!

- [@levimalott](https://github.com/levimalott) ([#1647](https://github.com/fishtown-analytics/dbt/pull/1647))
- [@aminamos](https://github.com/aminamos) ([#1609](https://github.com/fishtown-analytics/dbt/pull/1609))
- [@elexisvenator](https://github.com/elexisvenator) ([#1540](https://github.com/fishtown-analytics/dbt/pull/1540))
- [@edmundyan](https://github.com/edmundyan) ([#1663](https://github.com/fishtown-analytics/dbt/pull/1663))
- [@vitorbaptista](https://github.com/vitorbaptista) ([#1664](https://github.com/fishtown-analytics/dbt/pull/1664))
- [@sjwhitworth](https://github.com/sjwhitworth) ([#1672](https://github.com/fishtown-analytics/dbt/pull/1672), [#1673](https://github.com/fishtown-analytics/dbt/pull/1673))
- [@mikaelene](https://github.com/mikaelene) ([#1688](https://github.com/fishtown-analytics/dbt/pull/1688))



Expand Down
4 changes: 3 additions & 1 deletion core/dbt/adapters/factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import dbt.exceptions
from importlib import import_module
from dbt.include.global_project import PACKAGES
from dbt.logger import GLOBAL_LOGGER as logger

import threading

Expand Down Expand Up @@ -29,7 +30,8 @@ def get_relation_class_by_name(adapter_name):
def load_plugin(adapter_name):
try:
mod = import_module('.' + adapter_name, 'dbt.adapters')
except ImportError:
except ImportError as e:
logger.info("Error importing adapter: {}".format(e))
raise dbt.exceptions.RuntimeException(
"Could not find adapter type {}!".format(adapter_name)
)
Expand Down
9 changes: 6 additions & 3 deletions core/dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,19 @@ def get_status(cls, cursor):
'`get_status` is not implemented for this adapter!'
)

@classmethod
def process_results(cls, column_names, rows):
return [dict(zip(column_names, row)) for row in rows]

@classmethod
def get_result_from_cursor(cls, cursor):
data = []
column_names = []

if cursor.description is not None:
column_names = [col[0] for col in cursor.description]
raw_results = cursor.fetchall()
data = [dict(zip(column_names, row))
for row in raw_results]
rows = cursor.fetchall()
data = cls.process_results(column_names, rows)

return dbt.clients.agate_helper.table_from_data(data, column_names)

Expand Down
3 changes: 2 additions & 1 deletion core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ def compile_node(self, node, manifest, extra_context=None):
if 'data' in injected_node.tags and \
is_type(injected_node, NodeType.Test):
injected_node.wrapped_sql = (
"select count(*) from (\n{test_sql}\n) sbq").format(
"select count(*) as errors "
"from (\n{test_sql}\n) sbq").format(
test_sql=injected_node.injected_sql)
else:
# don't wrap schema tests or analyses.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@

{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}

-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def run(self):
elif output == 'path':
generator = self.generate_paths
else:
raise dbt.exceptions.IternalException(
raise dbt.exceptions.InternalException(
'Invalid output {}'.format(output)
)
for result in generator():
Expand Down
19 changes: 6 additions & 13 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,23 +317,16 @@ def get_model_schemas(self, selected_uids):

def create_schemas(self, adapter, selected_uids):
required_schemas = self.get_model_schemas(selected_uids)

# Snowflake needs to issue a "use {schema}" query, where schema
# is the one defined in the profile. Create this schema if it
# does not exist, otherwise subsequent queries will fail. Generally,
# dbt expects that this schema will exist anyway.
required_schemas.add(
(self.config.credentials.database, self.config.credentials.schema)
)

required_databases = set(db for db, _ in required_schemas)

existing_schemas = set()
existing_schemas_lowered = set()
for db in required_databases:
existing_schemas.update((db, s) for s in adapter.list_schemas(db))
existing_schemas_lowered.update(
(db.lower(), s.lower()) for s in adapter.list_schemas(db))

for database, schema in (required_schemas - existing_schemas):
adapter.create_schema(database, schema)
for db, schema in required_schemas:
if (db.lower(), schema.lower()) not in existing_schemas_lowered:
adapter.create_schema(db, schema)

def get_result(self, results, elapsed_time, generated_at):
return ExecutionResult(
Expand Down
17 changes: 15 additions & 2 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Optional, Any, Dict

import google.auth
import google.api_core
Expand All @@ -13,8 +15,10 @@

from hologram.helpers import StrEnum

from dataclasses import dataclass
from typing import Optional, Any, Dict

class Priority(StrEnum):
Interactive = 'interactive'
Batch = 'batch'


class BigQueryConnectionMethod(StrEnum):
Expand All @@ -30,6 +34,7 @@ class BigQueryCredentials(Credentials):
keyfile_json: Optional[Dict[str, Any]] = None
timeout_seconds: Optional[int] = 300
location: Optional[str] = None
priority: Optional[Priority] = None
_ALIASES = {
'project': 'database',
'dataset': 'schema',
Expand Down Expand Up @@ -178,6 +183,14 @@ def raw_execute(self, sql, fetch=False):

job_config = google.cloud.bigquery.QueryJobConfig()
job_config.use_legacy_sql = False

priority = conn.credentials.priority
if priority == Priority.Batch:
job_config.priority = google.cloud.bigquery.QueryPriority.BATCH
else:
job_config.priority = \
google.cloud.bigquery.QueryPriority.INTERACTIVE

query_job = client.query(sql, job_config)

# this blocks until the query has completed
Expand Down
4 changes: 4 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ def expand_column_types(self, goal, current):
# This is a no-op on BigQuery
pass

def expand_target_column_types(self, from_relation, to_relation):
# This is a no-op on BigQuery
pass

def list_relations_without_caching(self, information_schema, schema):
connection = self.connections.get_thread_connection()
client = connection.handle
Expand Down
2 changes: 2 additions & 0 deletions plugins/postgres/dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class PostgresAdapter(SQLAdapter):
ConnectionManager = PostgresConnectionManager
Column = PostgresColumn

AdapterSpecificConfigs = frozenset({'unlogged'})

@classmethod
def date_function(cls):
return 'now()'
Expand Down
12 changes: 12 additions & 0 deletions plugins/postgres/dbt/include/postgres/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
{% macro postgres__create_table_as(temporary, relation, sql) -%}
{%- set unlogged = config.get('unlogged', default=false) -%}

create {% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %} table {{ relation }}
as (
{{ sql }}
);
{%- endmacro %}

{% macro postgres__create_schema(database_name, schema_name) -%}
{% if database_name -%}
Expand Down
12 changes: 7 additions & 5 deletions plugins/postgres/dbt/include/postgres/macros/relations.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{% macro postgres_get_relations () -%}

{#
-- in pg_depend, objid is the dependent, refobjid is the referenced object
-- > a pg_depend entry indicates that the referenced object cannot be
-- > dropped without also dropping the dependent object.
#}

{%- call statement('relations', fetch_result=True) -%}
-- {#
-- in pg_depend, objid is the dependent, refobjid is the referenced object
-- "a pg_depend entry indicates that the referenced object cannot be dropped without also dropping the dependent object."
-- #}
-- {# this only works with the current database #}
with relation as (
select
pg_rewrite.ev_class as class,
Expand Down
2 changes: 1 addition & 1 deletion plugins/redshift/dbt/include/redshift/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
numeric_precision,
numeric_scale

from {{ relation.information_schema('columns') }}
from information_schema."columns"
where table_name = '{{ relation.identifier }}'
),

Expand Down
6 changes: 3 additions & 3 deletions plugins/redshift/dbt/include/redshift/macros/catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

),

columns as (
table_columns as (

select
'{{ database }}'::varchar as table_database,
Expand All @@ -74,15 +74,15 @@
null::varchar as column_comment


from information_schema.columns
from information_schema."columns"

),

unioned as (

select *
from tables
join columns using (table_database, table_schema, table_name)
join table_columns using (table_database, table_schema, table_name)

union all

Expand Down
30 changes: 29 additions & 1 deletion plugins/snowflake/dbt/adapters/snowflake/connections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import re
from io import StringIO
from contextlib import contextmanager
import datetime
import pytz

import snowflake.connector
import snowflake.connector.errors
Expand Down Expand Up @@ -166,6 +168,26 @@ def _split_queries(cls, sql):
split_query = snowflake.connector.util_text.split_statements(sql_buf)
return [part[0] for part in split_query]

@classmethod
def process_results(cls, column_names, rows):
# Override for Snowflake. The datetime objects returned by
# snowflake-connector-python are not pickleable, so we need
# to replace them with sane timezones
fixed = []
for row in rows:
fixed_row = []
for col in row:
if isinstance(col, datetime.datetime) and col.tzinfo:
offset = col.utcoffset()
offset_seconds = offset.total_seconds()
new_timezone = pytz.FixedOffset(offset_seconds // 60)
col = col.astimezone(tz=new_timezone)
fixed_row.append(col)

fixed.append(fixed_row)

return super().process_results(column_names, fixed)

def add_query(self, sql, auto_begin=True,
bindings=None, abridge_sql_log=False):

Expand Down Expand Up @@ -197,12 +219,18 @@ def add_query(self, sql, auto_begin=True,
)

if cursor is None:
conn = self.get_thread_connection()
if conn is None or conn.name is None:
conn_name = '<None>'
else:
conn_name = conn.name

raise dbt.exceptions.RuntimeException(
"Tried to run an empty query on model '{}'. If you are "
"conditionally running\nsql, eg. in a model hook, make "
"sure your `else` clause contains valid sql!\n\n"
"Provided SQL:\n{}"
.format(self.nice_connection_name(), sql)
.format(conn_name, sql)
)

return connection, cursor
Expand Down
18 changes: 10 additions & 8 deletions plugins/snowflake/dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ class SnowflakeAdapter(SQLAdapter):
Relation = SnowflakeRelation
ConnectionManager = SnowflakeConnectionManager

AdapterSpecificConfigs = frozenset({"transient"})
AdapterSpecificConfigs = frozenset(
{"transient", "cluster_by", "automatic_clustering"}
)

@classmethod
def date_function(cls):
return 'CURRENT_TIMESTAMP()'
return "CURRENT_TIMESTAMP()"

@classmethod
def _catalog_filter_table(cls, table, manifest):
Expand All @@ -25,15 +27,15 @@ def _catalog_filter_table(cls, table, manifest):

def _make_match_kwargs(self, database, schema, identifier):
quoting = self.config.quoting
if identifier is not None and quoting['identifier'] is False:
if identifier is not None and quoting["identifier"] is False:
identifier = identifier.upper()

if schema is not None and quoting['schema'] is False:
if schema is not None and quoting["schema"] is False:
schema = schema.upper()

if database is not None and quoting['database'] is False:
if database is not None and quoting["database"] is False:
database = database.upper()

return filter_null_values({'identifier': identifier,
'schema': schema,
'database': database})
return filter_null_values(
{"identifier": identifier, "schema": schema, "database": database}
)
Loading

0 comments on commit eb8bce4

Please sign in to comment.