diff --git a/dbt/adapters/bigquery.py b/dbt/adapters/bigquery.py index e35c2fa6192..a78c3d9e1d0 100644 --- a/dbt/adapters/bigquery.py +++ b/dbt/adapters/bigquery.py @@ -142,9 +142,14 @@ def open_connection(cls, connection): return result @classmethod - def query_for_existing(cls, profile, schema, model_name=None): - dataset = cls.get_dataset(profile, schema, model_name) - tables = dataset.list_tables() + def query_for_existing(cls, profile, schemas, model_name=None): + if not isinstance(schemas, (list, tuple)): + schemas = [schemas] + + all_tables = [] + for schema in schemas: + dataset = cls.get_dataset(profile, schema, model_name) + all_tables.extend(dataset.list_tables()) relation_type_lookup = { 'TABLE': 'table', @@ -153,19 +158,18 @@ def query_for_existing(cls, profile, schema, model_name=None): } existing = [(table.name, relation_type_lookup.get(table.table_type)) - for table in tables] + for table in all_tables] return dict(existing) @classmethod - def drop(cls, profile, relation, relation_type, model_name=None): - schema = cls.get_default_schema(profile) + def drop(cls, profile, schema, relation, relation_type, model_name=None): dataset = cls.get_dataset(profile, schema, model_name) relation_object = dataset.table(relation) relation_object.delete() @classmethod - def rename(cls, profile, from_name, to_name, model_name=None): + def rename(cls, profile, schema, from_name, to_name, model_name=None): raise dbt.exceptions.NotImplementedException( '`rename` is not implemented for this adapter!') @@ -234,10 +238,10 @@ def execute_model(cls, profile, model, materialization, model_name=None): validate_connection(connection) model_name = model.get('name') + model_schema = model.get('schema') model_sql = model.get('injected_sql') - schema = cls.get_default_schema(profile) - dataset = cls.get_dataset(profile, schema, model_name) + dataset = cls.get_dataset(profile, model_schema, model_name) if materialization == 'view': res = cls.materialize_as_view(profile, dataset, model_name, @@ -313,13 +317,23 @@ def drop_schema(cls, profile, schema, model_name=None): cls.drop_tables_in_schema(dataset) dataset.delete() + @classmethod + def get_existing_schemas(cls, profile, model_name=None): + conn = cls.get_connection(profile, model_name) + + client = conn.get('handle') + + with cls.exception_handler(profile, 'list dataset', model_name): + all_datasets = client.list_datasets() + return [ds.name for ds in all_datasets] + @classmethod def check_schema_exists(cls, profile, schema, model_name=None): conn = cls.get_connection(profile, model_name) client = conn.get('handle') - with cls.exception_handler(profile, 'create dataset', model_name): + with cls.exception_handler(profile, 'get dataset', model_name): all_datasets = client.list_datasets() return any([ds.name == schema for ds in all_datasets]) diff --git a/dbt/adapters/default.py b/dbt/adapters/default.py index 9681d084ac9..1ed88e1eb80 100644 --- a/dbt/adapters/default.py +++ b/dbt/adapters/default.py @@ -74,10 +74,15 @@ def alter_column_type(cls, profile, schema, table, column_name, '`alter_column_type` is not implemented for this adapter!') @classmethod - def query_for_existing(cls, profile, schema, model_name=None): + def query_for_existing(cls, profile, schemas, model_name=None): raise dbt.exceptions.NotImplementedException( '`query_for_existing` is not implemented for this adapter!') + @classmethod + def get_existing_schemas(cls, profile, model_name=None): + raise dbt.exceptions.NotImplementedException( + '`get_existing_schemas` is not implemented for this adapter!') + @classmethod def check_schema_exists(cls, profile, schema): raise dbt.exceptions.NotImplementedException( @@ -104,42 +109,43 @@ def get_result_from_cursor(cls, cursor): return data @classmethod - def drop(cls, profile, relation, relation_type, model_name=None): + def drop(cls, profile, schema, relation, relation_type, model_name=None): if relation_type == 'view': - return cls.drop_view(profile, relation, model_name) + return cls.drop_view(profile, schema, relation, model_name) elif relation_type == 'table': - return cls.drop_table(profile, relation, model_name) + return cls.drop_table(profile, schema, relation, model_name) else: raise RuntimeError( "Invalid relation_type '{}'" .format(relation_type)) @classmethod - def drop_view(cls, profile, view, model_name): - sql = ('drop view if exists {} cascade' - .format(cls._get_quoted_identifier(profile, view))) + def drop_relation(cls, profile, schema, rel_name, rel_type, model_name): + relation = cls.quote_schema_and_table(profile, schema, rel_name) + sql = 'drop {} if exists {} cascade'.format(rel_type, relation) connection, cursor = cls.add_query(profile, sql, model_name) @classmethod - def drop_table(cls, profile, table, model_name): - sql = ('drop table if exists {} cascade' - .format(cls._get_quoted_identifier(profile, table))) + def drop_view(cls, profile, schema, view, model_name): + cls.drop_relation(profile, schema, view, 'view', model_name) - connection, cursor = cls.add_query(profile, sql, model_name) + @classmethod + def drop_table(cls, profile, schema, table, model_name): + cls.drop_relation(profile, schema, table, 'table', model_name) @classmethod - def truncate(cls, profile, table, model_name=None): - sql = ('truncate table {}' - .format(cls._get_quoted_identifier(profile, table))) + def truncate(cls, profile, schema, table, model_name=None): + relation = cls.quote_schema_and_table(profile, schema, table) + sql = 'truncate table {}'.format(relation) connection, cursor = cls.add_query(profile, sql, model_name) @classmethod - def rename(cls, profile, from_name, to_name, model_name=None): - sql = ('alter table {} rename to {}' - .format(cls._get_quoted_identifier(profile, from_name), - cls.quote(to_name))) + def rename(cls, profile, schema, from_name, to_name, model_name=None): + from_relation = cls.quote_schema_and_table(profile, schema, from_name) + to_relation = cls.quote(to_name) + sql = 'alter table {} rename to {}'.format(from_relation, to_relation) connection, cursor = cls.add_query(profile, sql, model_name) @@ -576,11 +582,6 @@ def already_exists(cls, profile, schema, table, model_name=None): """ return cls.table_exists(profile, schema, table, model_name) - @classmethod - def _get_quoted_identifier(cls, profile, identifier): - return cls.quote_schema_and_table( - profile, cls.get_default_schema(profile), identifier) - @classmethod def quote(cls, identifier): return '"{}"'.format(identifier) diff --git a/dbt/adapters/postgres.py b/dbt/adapters/postgres.py index cb85b1a80e8..8836f0bca92 100644 --- a/dbt/adapters/postgres.py +++ b/dbt/adapters/postgres.py @@ -107,14 +107,19 @@ def alter_column_type(cls, profile, schema, table, column_name, return connection, cursor @classmethod - def query_for_existing(cls, profile, schema, model_name=None): + def query_for_existing(cls, profile, schemas, model_name=None): + if not isinstance(schemas, (list, tuple)): + schemas = [schemas] + + schema_list = ",".join(["'{}'".format(schema) for schema in schemas]) + sql = """ select tablename as name, 'table' as type from pg_tables - where schemaname = '{schema}' + where schemaname in ({schema_list}) union all select viewname as name, 'view' as type from pg_views - where schemaname = '{schema}' - """.format(schema=schema).strip() # noqa + where schemaname in ({schema_list}) + """.format(schema_list=schema_list).strip() # noqa connection, cursor = cls.add_query(profile, sql, model_name, auto_begin=False) @@ -125,6 +130,16 @@ def query_for_existing(cls, profile, schema, model_name=None): return dict(existing) + @classmethod + def get_existing_schemas(cls, profile, model_name=None): + sql = "select distinct nspname from pg_namespace" + + connection, cursor = cls.add_query(profile, sql, model_name, + auto_begin=False) + results = cursor.fetchall() + + return [row[0] for row in results] + @classmethod def check_schema_exists(cls, profile, schema, model_name=None): sql = """ diff --git a/dbt/adapters/redshift.py b/dbt/adapters/redshift.py index 2768f565e45..e4e0db9d543 100644 --- a/dbt/adapters/redshift.py +++ b/dbt/adapters/redshift.py @@ -18,7 +18,7 @@ def date_function(cls): return 'getdate()' @classmethod - def drop(cls, profile, relation, relation_type, model_name=None): + def drop(cls, profile, schema, relation, relation_type, model_name=None): global drop_lock to_return = None @@ -34,7 +34,7 @@ def drop(cls, profile, relation, relation_type, model_name=None): cls.begin(profile, connection.get('name')) to_return = super(PostgresAdapter, cls).drop( - profile, relation, relation_type, model_name) + profile, schema, relation, relation_type, model_name) cls.commit(profile, connection) cls.begin(profile, connection.get('name')) diff --git a/dbt/adapters/snowflake.py b/dbt/adapters/snowflake.py index 59f63b7139e..801628e2dbc 100644 --- a/dbt/adapters/snowflake.py +++ b/dbt/adapters/snowflake.py @@ -103,12 +103,17 @@ def open_connection(cls, connection): return result @classmethod - def query_for_existing(cls, profile, schema, model_name=None): + def query_for_existing(cls, profile, schemas, model_name=None): + if not isinstance(schemas, (list, tuple)): + schemas = [schemas] + + schema_list = ",".join(["'{}'".format(schema) for schema in schemas]) + sql = """ select TABLE_NAME as name, TABLE_TYPE as type from INFORMATION_SCHEMA.TABLES - where TABLE_SCHEMA = '{schema}' - """.format(schema=schema).strip() # noqa + where TABLE_SCHEMA in ({schema_list}) + """.format(schema_list=schema_list).strip() # noqa _, cursor = cls.add_query(profile, sql, model_name, auto_begin=False) results = cursor.fetchall() @@ -124,9 +129,7 @@ def query_for_existing(cls, profile, schema, model_name=None): return dict(existing) @classmethod - def rename(cls, profile, from_name, to_name, model_name=None): - schema = cls.get_default_schema(profile) - + def rename(cls, profile, schema, from_name, to_name, model_name=None): sql = (('alter table "{schema}"."{from_name}" ' 'rename to "{schema}"."{to_name}"') .format(schema=schema, @@ -146,6 +149,17 @@ def create_schema(cls, profile, schema, model_name=None): sql = cls.get_create_schema_sql(schema) return cls.add_query(profile, sql, model_name, select_schema=False) + @classmethod + def get_existing_schemas(cls, profile, model_name=None): + sql = "select distinct SCHEMA_NAME from INFORMATION_SCHEMA.SCHEMATA" + + connection, cursor = cls.add_query(profile, sql, model_name, + select_schema=False, + auto_begin=False) + results = cursor.fetchall() + + return [row[0] for row in results] + @classmethod def check_schema_exists(cls, profile, schema, model_name=None): sql = """ diff --git a/dbt/context/common.py b/dbt/context/common.py index 0140b06f525..10144a02d19 100644 --- a/dbt/context/common.py +++ b/dbt/context/common.py @@ -58,6 +58,8 @@ def commit(self): def _add_macros(context, model, flat_graph): + macros_to_add = {'global': [], 'local': []} + for unique_id, macro in flat_graph.get('macros', {}).items(): package_name = macro.get('package_name') @@ -71,9 +73,15 @@ def _add_macros(context, model, flat_graph): context.get(package_name, {}) \ .update(macro_map) - if(package_name == model.get('package_name') or - package_name == dbt.include.GLOBAL_PROJECT_NAME): - context.update(macro_map) + if package_name == model.get('package_name'): + macros_to_add['local'].append(macro_map) + elif package_name == dbt.include.GLOBAL_PROJECT_NAME: + macros_to_add['global'].append(macro_map) + + # Load global macros before local macros -- local takes precedence + unprefixed_macros = macros_to_add['global'] + macros_to_add['local'] + for macro_map in unprefixed_macros: + context.update(macro_map) return context @@ -268,14 +276,14 @@ def generate(model, project, flat_graph, provider=None): "model": model, "post_hooks": post_hooks, "pre_hooks": pre_hooks, - "ref": provider.ref(model, project, profile, schema, flat_graph), - "schema": schema, + "ref": provider.ref(model, project, profile, flat_graph), + "schema": model.get('schema', schema), "sql": model.get('injected_sql'), "sql_now": adapter.date_function(), "fromjson": fromjson(model), "target": target, "this": dbt.utils.This( - schema, + model.get('schema', schema), dbt.utils.model_immediate_name(model, dbt.flags.NON_DESTRUCTIVE), model.get('name') ) diff --git a/dbt/context/parser.py b/dbt/context/parser.py index 873f70e12a4..1720c01a46f 100644 --- a/dbt/context/parser.py +++ b/dbt/context/parser.py @@ -7,7 +7,7 @@ execute = False -def ref(model, project, profile, schema, flat_graph): +def ref(model, project, profile, flat_graph): def ref(*args): if len(args) == 1 or len(args) == 2: diff --git a/dbt/context/runtime.py b/dbt/context/runtime.py index 744421ac2c2..29e67a130a8 100644 --- a/dbt/context/runtime.py +++ b/dbt/context/runtime.py @@ -14,7 +14,7 @@ execute = True -def ref(model, project, profile, schema, flat_graph): +def ref(model, project, profile, flat_graph): current_project = project.get('name') def do_ref(*args): @@ -54,6 +54,7 @@ def do_ref(*args): else: adapter = get_adapter(profile) table = target_model.get('name') + schema = target_model.get('schema') return adapter.quote_schema_and_table(profile, schema, table) diff --git a/dbt/contracts/graph/parsed.py b/dbt/contracts/graph/parsed.py index 511129a7914..d25d8d1b976 100644 --- a/dbt/contracts/graph/parsed.py +++ b/dbt/contracts/graph/parsed.py @@ -29,6 +29,7 @@ # identifiers Required('unique_id'): All(basestring, Length(min=1, max=255)), Required('fqn'): All(list, [All(basestring)]), + Required('schema'): basestring, Required('refs'): [All(tuple)], diff --git a/dbt/include/global_project/macros/etc/get_custom_schema.sql b/dbt/include/global_project/macros/etc/get_custom_schema.sql new file mode 100644 index 00000000000..f48fd8113d4 --- /dev/null +++ b/dbt/include/global_project/macros/etc/get_custom_schema.sql @@ -0,0 +1,54 @@ + +{# + Renders a schema name given a custom schema name. If the custom + schema name is none, then the resulting schema is just the "schema" + value in the specified target. If a schema override is specified, then + the resulting schema is the default schema concatenated with the + custom schema. + + This macro can be overriden in projects to define different semantics + for rendering a schema name. + + Arguments: + custom_schema_name: The custom schema name specified for a model, or none + +#} +{% macro generate_schema_name(custom_schema_name=none) -%} + + {%- set default_schema = target.schema -%} + {%- if custom_schema_name is none -%} + + {{ default_schema }} + + {%- else -%} + + {{ default_schema }}_{{ custom_schema_name | trim }} + + {%- endif -%} + +{%- endmacro %} + + +{# + Renders a schema name given a custom schema name. In production, this macro + will render out the overriden schema name for a model. Otherwise, the default + schema specified in the active target is used. + + Arguments: + custom_schema_name: The custom schema name specified for a model, or none + +#} +{% macro generate_schema_name_for_env(custom_schema_name=none) -%} + + {%- set default_schema = target.schema -%} + {%- if target.name == 'prod' and custom_schema_name is not none -%} + + {{ custom_schema_name | trim }} + + {%- else -%} + + {{ default_schema }} + + {%- endif -%} + +{%- endmacro %} diff --git a/dbt/include/global_project/macros/materializations/bigquery.sql b/dbt/include/global_project/macros/materializations/bigquery.sql index 10cfb5c2707..e61373a4634 100644 --- a/dbt/include/global_project/macros/materializations/bigquery.sql +++ b/dbt/include/global_project/macros/materializations/bigquery.sql @@ -7,7 +7,7 @@ {%- set existing_type = existing.get(identifier) -%} {%- if existing_type is not none -%} - {{ adapter.drop(identifier, existing_type) }} + {{ adapter.drop(schema, identifier, existing_type) }} {%- endif -%} -- build model @@ -25,7 +25,7 @@ {%- set existing_type = existing.get(identifier) -%} {%- if existing_type is not none -%} - {{ adapter.drop(identifier, existing_type) }} + {{ adapter.drop(schema, identifier, existing_type) }} {%- endif -%} -- build model diff --git a/dbt/include/global_project/macros/materializations/helpers.sql b/dbt/include/global_project/macros/materializations/helpers.sql index 321c8c2cff4..1171be00f69 100644 --- a/dbt/include/global_project/macros/materializations/helpers.sql +++ b/dbt/include/global_project/macros/materializations/helpers.sql @@ -41,9 +41,9 @@ {% endmacro %} -{% macro drop_if_exists(existing, name) %} +{% macro drop_if_exists(existing, schema, name) %} {% set existing_type = existing.get(name) %} {% if existing_type is not none %} - {{ adapter.drop(name, existing_type) }} + {{ adapter.drop(schema, name, existing_type) }} {% endif %} {% endmacro %} diff --git a/dbt/include/global_project/macros/materializations/incremental.sql b/dbt/include/global_project/macros/materializations/incremental.sql index b9c422b66cf..248249631e4 100644 --- a/dbt/include/global_project/macros/materializations/incremental.sql +++ b/dbt/include/global_project/macros/materializations/incremental.sql @@ -33,9 +33,9 @@ {% if existing_type is none -%} -- noop {%- elif should_truncate -%} - {{ adapter.truncate(identifier) }} + {{ adapter.truncate(schema, identifier) }} {%- elif should_drop -%} - {{ adapter.drop(identifier, existing_type) }} + {{ adapter.drop(schema, identifier, existing_type) }} {%- endif %} {{ run_hooks(pre_hooks, inside_transaction=False) }} diff --git a/dbt/include/global_project/macros/materializations/table.sql b/dbt/include/global_project/macros/materializations/table.sql index 748e1bd73db..96691afdbec 100644 --- a/dbt/include/global_project/macros/materializations/table.sql +++ b/dbt/include/global_project/macros/materializations/table.sql @@ -5,14 +5,14 @@ {%- set existing = adapter.query_for_existing(schema) -%} {%- set existing_type = existing.get(identifier) -%} - {{ drop_if_exists(existing, tmp_identifier) }} + {{ drop_if_exists(existing, schema, tmp_identifier) }} -- setup {% if non_destructive_mode -%} {% if existing_type == 'table' -%} - {{ adapter.truncate(identifier) }} + {{ adapter.truncate(schema, identifier) }} {% elif existing_type == 'view' -%} - {{ adapter.drop(identifier, existing_type) }} + {{ adapter.drop(schema, identifier, existing_type) }} {%- endif %} {%- endif %} @@ -48,8 +48,8 @@ {% if non_destructive_mode -%} -- noop {%- else -%} - {{ drop_if_exists(existing, identifier) }} - {{ adapter.rename(tmp_identifier, identifier) }} + {{ drop_if_exists(existing, schema, identifier) }} + {{ adapter.rename(schema, tmp_identifier, identifier) }} {%- endif %} -- `COMMIT` happens here diff --git a/dbt/include/global_project/macros/materializations/view.sql b/dbt/include/global_project/macros/materializations/view.sql index ab4de7efd9d..43b7053746e 100644 --- a/dbt/include/global_project/macros/materializations/view.sql +++ b/dbt/include/global_project/macros/materializations/view.sql @@ -7,7 +7,7 @@ {%- set existing_type = existing.get(identifier) -%} {{ run_hooks(pre_hooks, inside_transaction=False) }} - {{ drop_if_exists(existing, tmp_identifier) }} + {{ drop_if_exists(existing, schema, tmp_identifier) }} -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} @@ -27,8 +27,8 @@ {% if non_destructive_mode and existing_type == 'view' -%} -- noop {%- else -%} - {{ drop_if_exists(existing, identifier) }} - {{ adapter.rename(tmp_identifier, identifier) }} + {{ drop_if_exists(existing, schema, identifier) }} + {{ adapter.rename(schema, tmp_identifier, identifier) }} {%- endif %} -- `COMMIT` happens here diff --git a/dbt/loader.py b/dbt/loader.py index 57472351b1b..ae1ad706b36 100644 --- a/dbt/loader.py +++ b/dbt/loader.py @@ -110,7 +110,8 @@ def load_project(cls, root_project, all_projects, project, project_name, root_project=root_project, all_projects=all_projects, root_dir=project.get('project-root'), - relative_dirs=project.get('source-paths', [])) + relative_dirs=project.get('source-paths', []), + macros=macros) class DataTestLoader(ResourceLoader): @@ -135,7 +136,8 @@ class ArchiveLoader(ResourceLoader): def load_project(cls, root_project, all_projects, project, project_name, macros): return dbt.parser.parse_archives_from_projects(root_project, - all_projects) + all_projects, + macros) class RunHookLoader(ResourceLoader): diff --git a/dbt/model.py b/dbt/model.py index c0a509b122f..5d5010f3154 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -13,6 +13,7 @@ class SourceConfig(object): AppendListFields = ['pre-hook', 'post-hook'] ExtendDictFields = ['vars'] ClobberFields = [ + 'schema', 'enabled', 'materialized', 'dist', diff --git a/dbt/node_runners.py b/dbt/node_runners.py index c10e65d8162..c33a847b46d 100644 --- a/dbt/node_runners.py +++ b/dbt/node_runners.py @@ -79,11 +79,16 @@ def raise_on_first_error(self): return False @classmethod - def is_ephemeral_model(cls, node): - materialized = dbt.utils.get_materialization(node) - resource_type = node.get('resource_type') + def is_model(cls, node): + return node.get('resource_type') == NodeType.Model + + @classmethod + def is_ephemeral(cls, node): + return dbt.utils.get_materialization(node) == 'ephemeral' - return materialized == 'ephemeral' and resource_type == NodeType.Model + @classmethod + def is_ephemeral_model(cls, node): + return cls.is_model(node) and cls.is_ephemeral(node) def safe_run(self, flat_graph, existing): catchable_errors = (dbt.exceptions.CompilationException, @@ -140,10 +145,6 @@ def safe_run(self, flat_graph, existing): result.execution_time = time.time() - started return result - @classmethod - def get_schema(cls, adapter, profile): - return adapter.get_default_schema(profile) - def before_execute(self): raise NotImplementedException() @@ -157,9 +158,9 @@ def after_execute(self, result): raise NotImplementedException() def on_skip(self): - schema_name = self.get_schema(self.adapter, self.profile) - + schema_name = self.node.get('schema') node_name = self.node.get('name') + if not self.is_ephemeral_model(self.node): dbt.ui.printer.print_skip_line(self.node, schema_name, node_name, self.node_index, self.num_nodes) @@ -170,6 +171,15 @@ def on_skip(self): def do_skip(self): self.skip = True + @classmethod + def get_model_schemas(cls, flat_graph): + schemas = set() + for node in flat_graph['nodes'].values(): + if cls.is_model(node) and not cls.is_ephemeral(node): + schemas.add(node['schema']) + + return schemas + @classmethod def before_run(self, project, adapter, flat_graph): pass @@ -259,20 +269,6 @@ class ModelRunner(CompileRunner): def raise_on_first_error(self): return False - @classmethod - def try_create_schema(cls, project, adapter): - profile = project.run_environment() - schema_name = cls.get_schema(adapter, profile) - - schema_exists = adapter.check_schema_exists(profile, schema_name) - - if schema_exists: - logger.debug('schema {} already exists -- ' - 'not creating'.format(schema_name)) - return - - adapter.create_schema(profile, schema_name) - @classmethod def run_hooks(cls, project, adapter, flat_graph, hook_type): profile = project.run_environment() @@ -316,9 +312,18 @@ def safe_run_hooks(cls, project, adapter, flat_graph, hook_type): logger.info("Database error while running {}".format(hook_type)) raise + @classmethod + def create_schemas(cls, project, adapter, flat_graph): + profile = project.run_environment() + required_schemas = cls.get_model_schemas(flat_graph) + existing_schemas = set(adapter.get_existing_schemas(profile)) + + for schema in (required_schemas - existing_schemas): + adapter.create_schema(profile, schema) + @classmethod def before_run(cls, project, adapter, flat_graph): - cls.try_create_schema(project, adapter) + cls.create_schemas(project, adapter, flat_graph) cls.safe_run_hooks(project, adapter, flat_graph, RunHookType.Start) @classmethod @@ -344,7 +349,7 @@ def after_run(cls, project, adapter, results, flat_graph, elapsed): def describe_node(self): materialization = dbt.utils.get_materialization(self.node) - schema_name = self.get_schema(self.adapter, self.profile) + schema_name = self.node.get('schema') node_name = self.node.get('name') return "{} model {}.{}".format(materialization, schema_name, node_name) @@ -355,7 +360,7 @@ def print_start_line(self): self.num_nodes) def print_result_line(self, result): - schema_name = self.get_schema(self.adapter, self.profile) + schema_name = self.node.get('schema') dbt.ui.printer.print_model_result_line(result, schema_name, self.node_index, @@ -398,7 +403,7 @@ def describe_node(self): return "test {}".format(node_name) def print_result_line(self, result): - schema_name = self.get_schema(self.adapter, self.profile) + schema_name = self.node.get('schema') dbt.ui.printer.print_test_result_line(result, schema_name, self.node_index, diff --git a/dbt/parser.py b/dbt/parser.py index f95692f4c61..f36852f0eaa 100644 --- a/dbt/parser.py +++ b/dbt/parser.py @@ -231,6 +231,12 @@ def parse_node(node, node_path, root_project_config, package_project_config, profile = db_wrapper.profile adapter.release_connection(profile, node.get('name')) + # Special macro defined in the global project + default_schema = context.get('schema') + schema_override = config.config.get('schema') + get_schema = context.get('generate_schema_name', lambda x: default_schema) + node['schema'] = get_schema(schema_override) + # Overwrite node config config_dict = node.get('config', {}) config_dict.update(config.config) @@ -348,7 +354,7 @@ def get_hooks(all_projects, hook_type): def load_and_parse_run_hook_type(root_project, all_projects, hook_type, - macros): + macros=None): if dbt.flags.STRICT_MODE: dbt.contracts.project.validate_list(all_projects) @@ -426,7 +432,7 @@ def load_and_parse_macros(package_name, root_project, all_projects, root_dir, return result -def parse_schema_tests(tests, root_project, projects): +def parse_schema_tests(tests, root_project, projects, macros=None): to_return = {} for test in tests: @@ -471,7 +477,8 @@ def parse_schema_tests(tests, root_project, projects): test, model_name, config, test_type, root_project, projects.get(package_name), - all_projects=projects) + all_projects=projects, + macros=macros) if to_add is not None: to_return[to_add.get('unique_id')] = to_add @@ -524,7 +531,7 @@ def as_kwarg(key, value): def parse_schema_test(test_base, model_name, test_config, test_type, root_project_config, package_project_config, - all_projects): + all_projects, macros=None): if isinstance(test_config, (basestring, int, float, bool)): test_args = {'arg': test_config} @@ -568,11 +575,12 @@ def parse_schema_test(test_base, model_name, test_config, test_type, all_projects, tags={'schema'}, fqn_extra=None, - fqn=fqn_override) + fqn=fqn_override, + macros=macros) def load_and_parse_yml(package_name, root_project, all_projects, root_dir, - relative_dirs): + relative_dirs, macros=None): extension = "[!.#~]*.yml" if dbt.flags.STRICT_MODE: @@ -605,10 +613,10 @@ def load_and_parse_yml(package_name, root_project, all_projects, root_dir, 'raw_yml': file_contents }) - return parse_schema_tests(result, root_project, all_projects) + return parse_schema_tests(result, root_project, all_projects, macros) -def parse_archives_from_projects(root_project, all_projects): +def parse_archives_from_projects(root_project, all_projects, macros=None): archives = [] to_return = {} @@ -625,7 +633,8 @@ def parse_archives_from_projects(root_project, all_projects): node_path, root_project, all_projects.get(archive.get('package_name')), - all_projects) + all_projects, + macros=macros) return to_return diff --git a/dbt/runner.py b/dbt/runner.py index 2de60e7b229..e18ea22a08a 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -4,8 +4,6 @@ from dbt.adapters.factory import get_adapter from dbt.logger import GLOBAL_LOGGER as logger -from dbt.utils import get_materialization - import dbt.clients.jinja import dbt.compilation import dbt.exceptions @@ -102,7 +100,6 @@ def get_relevant_runners(self, node_runners, node_subset): def execute_nodes(self, linker, Runner, flat_graph, node_dependency_list): profile = self.project.run_environment() adapter = get_adapter(profile) - schema_name = adapter.get_default_schema(profile) num_threads = self.threads target_name = self.project.get_target().get('name') @@ -112,7 +109,11 @@ def execute_nodes(self, linker, Runner, flat_graph, node_dependency_list): dbt.ui.printer.print_timestamped_line(concurrency_line) dbt.ui.printer.print_timestamped_line("") - existing = adapter.query_for_existing(profile, schema_name) + schemas = list(Runner.get_model_schemas(flat_graph)) + if len(schemas) > 0: + existing = adapter.query_for_existing(profile, schemas) + else: + existing = {} node_runners = self.get_runners(Runner, adapter, node_dependency_list) pool = ThreadPool(num_threads) @@ -159,7 +160,7 @@ def execute_nodes(self, linker, Runner, flat_graph, node_dependency_list): raise for conn_name in adapter.cancel_open_connections(profile): - dbt.ui.printer.print_cancel_line(conn_name, schema_name) + dbt.ui.printer.print_cancel_line(conn_name) dbt.ui.printer.print_run_end_messages(node_results, early_exit=True) diff --git a/dbt/ui/printer.py b/dbt/ui/printer.py index ea0de0bf9b3..33904c1c2b4 100644 --- a/dbt/ui/printer.py +++ b/dbt/ui/printer.py @@ -102,8 +102,8 @@ def print_skip_line(model, schema, relation, index, num_models): print_fancy_output_line(msg, yellow('SKIP'), index, num_models) -def print_cancel_line(model, schema): - msg = 'CANCEL query {}.{}'.format(schema, model) +def print_cancel_line(model): + msg = 'CANCEL query {}'.format(model) print_fancy_output_line(msg, red('CANCEL'), index=None, total=None) diff --git a/dbt/utils.py b/dbt/utils.py index ddb83eb20ed..53e6f43e58c 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -11,6 +11,7 @@ DBTConfigKeys = [ + 'schema', 'enabled', 'materialized', 'dist', diff --git a/test/integration/024_custom_schema_test/macros/schema.sql b/test/integration/024_custom_schema_test/macros/schema.sql new file mode 100644 index 00000000000..b2456437d31 --- /dev/null +++ b/test/integration/024_custom_schema_test/macros/schema.sql @@ -0,0 +1,6 @@ + +{% macro generate_schema_name(schema_name) -%} + + {{ schema_name }}_{{ target.schema }}_macro + +{%- endmacro %} diff --git a/test/integration/024_custom_schema_test/models/view_1.sql b/test/integration/024_custom_schema_test/models/view_1.sql new file mode 100644 index 00000000000..a42ec4a2916 --- /dev/null +++ b/test/integration/024_custom_schema_test/models/view_1.sql @@ -0,0 +1,3 @@ + + +select * from "{{ target.schema }}"."seed" diff --git a/test/integration/024_custom_schema_test/models/view_2.sql b/test/integration/024_custom_schema_test/models/view_2.sql new file mode 100644 index 00000000000..11329945d82 --- /dev/null +++ b/test/integration/024_custom_schema_test/models/view_2.sql @@ -0,0 +1,4 @@ + +{{ config(schema='custom') }} + +select * from {{ ref('view_1') }} diff --git a/test/integration/024_custom_schema_test/models/view_3.sql b/test/integration/024_custom_schema_test/models/view_3.sql new file mode 100644 index 00000000000..c208e5d32df --- /dev/null +++ b/test/integration/024_custom_schema_test/models/view_3.sql @@ -0,0 +1,30 @@ + +{{ config(schema='test') }} + + +with v1 as ( + + select * from {{ ref('view_1') }} + +), + +v2 as ( + + select * from {{ ref('view_2') }} + +), + +combined as ( + + select last_name from v1 + union all + select last_name from v2 + +) + +select + last_name, + count(*) as count + +from combined +group by 1 diff --git a/test/integration/024_custom_schema_test/seed.sql b/test/integration/024_custom_schema_test/seed.sql new file mode 100644 index 00000000000..b041208db00 --- /dev/null +++ b/test/integration/024_custom_schema_test/seed.sql @@ -0,0 +1,25 @@ + +drop table if exists "{schema}"."seed" cascade; +create table "{schema}"."seed" ( + id BIGSERIAL PRIMARY KEY, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20) +); + +drop table if exists "{schema}"."agg" cascade; +create table "{schema}"."agg" ( + last_name VARCHAR(50), + count BIGINT +); + + +insert into "{schema}"."seed" (first_name, last_name, email, gender, ip_address) values +('Jack', 'Hunter', 'jhunter0@pbs.org', 'Male', '59.80.20.168'), +('Kathryn', 'Walker', 'kwalker1@ezinearticles.com', 'Female', '194.121.179.35'), +('Gerald', 'Ryan', 'gryan2@com.com', 'Male', '11.3.212.243'); + +insert into "{schema}"."agg" (last_name, count) values +('Hunter', 2), ('Walker', 2), ('Ryan', 2); diff --git a/test/integration/024_custom_schema_test/test_custom_schema.py b/test/integration/024_custom_schema_test/test_custom_schema.py new file mode 100644 index 00000000000..c51dc476943 --- /dev/null +++ b/test/integration/024_custom_schema_test/test_custom_schema.py @@ -0,0 +1,139 @@ +from nose.plugins.attrib import attr +from test.integration.base import DBTIntegrationTest + + +class TestCustomSchema(DBTIntegrationTest): + + @property + def schema(self): + return "custom_schema_024" + + @property + def models(self): + return "test/integration/024_custom_schema_test/models" + + @attr(type='postgres') + def test__postgres__custom_schema_no_prefix(self): + self.use_default_project() + self.run_sql_file("test/integration/024_custom_schema_test/seed.sql") + + self.run_dbt() + + schema = self.unique_schema() + v2_schema = "{}_custom".format(schema) + xf_schema = "{}_test".format(schema) + + self.assertTablesEqual("seed","view_1") + self.assertTablesEqual("seed","view_2", schema, v2_schema) + self.assertTablesEqual("agg","view_3", schema, xf_schema) + + +class TestCustomProjectSchemaWithPrefix(DBTIntegrationTest): + + @property + def schema(self): + return "custom_schema_024" + + @property + def models(self): + return "test/integration/024_custom_schema_test/models" + + @property + def profile_config(self): + return { + 'test': { + 'outputs': { + 'my-target': { + 'type': 'postgres', + 'threads': 1, + 'host': 'database', + 'port': 5432, + 'user': 'root', + 'pass': 'password', + 'dbname': 'dbt', + 'schema': self.unique_schema(), + } + }, + 'target': 'my-target' + } + } + + @property + def project_config(self): + return { + "models": { + "schema": "dbt_test" + } + } + + @attr(type='postgres') + def test__postgres__custom_schema_with_prefix(self): + self.use_default_project() + self.run_sql_file("test/integration/024_custom_schema_test/seed.sql") + + self.run_dbt() + + schema = self.unique_schema() + v1_schema = "{}_dbt_test".format(schema) + v2_schema = "{}_custom".format(schema) + xf_schema = "{}_test".format(schema) + + self.assertTablesEqual("seed","view_1", schema, v1_schema) + self.assertTablesEqual("seed","view_2", schema, v2_schema) + self.assertTablesEqual("agg","view_3", schema, xf_schema) + + +class TestCustomSchemaWithCustomMacro(DBTIntegrationTest): + + @property + def schema(self): + return "custom_schema_024" + + @property + def models(self): + return "test/integration/024_custom_schema_test/models" + + @property + def profile_config(self): + return { + 'test': { + 'outputs': { + 'prod': { + 'type': 'postgres', + 'threads': 1, + 'host': 'database', + 'port': 5432, + 'user': 'root', + 'pass': 'password', + 'dbname': 'dbt', + 'schema': self.unique_schema(), + } + }, + 'target': 'prod' + } + } + + @property + def project_config(self): + return { + 'macro-paths': ['test/integration/024_custom_schema_test/macros'], + 'models': { + 'schema': 'dbt_test' + } + } + + @attr(type='postgres') + def test__postgres__custom_schema_from_macro(self): + self.use_default_project() + self.run_sql_file("test/integration/024_custom_schema_test/seed.sql") + + self.run_dbt() + + schema = self.unique_schema() + v1_schema = "dbt_test_{}_macro".format(schema) + v2_schema = "custom_{}_macro".format(schema) + xf_schema = "test_{}_macro".format(schema) + + self.assertTablesEqual("seed","view_1", schema, v1_schema) + self.assertTablesEqual("seed","view_2", schema, v2_schema) + self.assertTablesEqual("agg","view_3", schema, xf_schema) diff --git a/test/integration/base.py b/test/integration/base.py index 97db0e3bf08..bd0e04a01a4 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -317,7 +317,8 @@ def run_sql(self, query, fetch='None'): print(e) raise e - def get_table_columns(self, table): + def get_table_columns(self, table, schema=None): + schema = self.unique_schema() if schema is None else schema sql = """ select column_name, data_type, character_maximum_length from information_schema.columns @@ -325,11 +326,12 @@ def get_table_columns(self, table): and table_schema = '{}' order by column_name asc""" - result = self.run_sql(sql.format(table, self.unique_schema()), fetch='all') + result = self.run_sql(sql.format(table, schema), fetch='all') return result - def get_models_in_schema(self): + def get_models_in_schema(self, schema=None): + schema = self.unique_schema() if schema is None else schema sql = """ select table_name, case when table_type = 'BASE TABLE' then 'table' @@ -341,30 +343,33 @@ def get_models_in_schema(self): order by table_name """ - result = self.run_sql(sql.format(self.unique_schema()), fetch='all') + result = self.run_sql(sql.format(schema), fetch='all') return {model_name: materialization for (model_name, materialization) in result} - def assertTablesEqual(self, table_a, table_b): - self.assertTableColumnsEqual(table_a, table_b) - self.assertTableRowCountsEqual(table_a, table_b) + def assertTablesEqual(self, table_a, table_b, table_a_schema=None, table_b_schema=None): + table_a_schema = self.unique_schema() if table_a_schema is None else table_a_schema + table_b_schema = self.unique_schema() if table_b_schema is None else table_b_schema - columns = self.get_table_columns(table_a) + self.assertTableColumnsEqual(table_a, table_b, table_a_schema, table_b_schema) + self.assertTableRowCountsEqual(table_a, table_b, table_a_schema, table_b_schema) + + columns = self.get_table_columns(table_a, table_a_schema) columns_csv = ", ".join(['"{}"'.format(record[0]) for record in columns]) - table_sql = "SELECT {} FROM {}" sql = """ SELECT COUNT(*) FROM ( - (SELECT {columns} FROM "{schema}"."{table_a}" EXCEPT - SELECT {columns} FROM "{schema}"."{table_b}") + (SELECT {columns} FROM "{table_a_schema}"."{table_a}" EXCEPT + SELECT {columns} FROM "{table_b_schema}"."{table_b}") UNION ALL - (SELECT {columns} FROM "{schema}"."{table_b}" EXCEPT - SELECT {columns} FROM "{schema}"."{table_a}") + (SELECT {columns} FROM "{table_b_schema}"."{table_b}" EXCEPT + SELECT {columns} FROM "{table_a_schema}"."{table_a}") ) AS a""".format( columns=columns_csv, - schema=self.unique_schema(), + table_a_schema=table_a_schema, + table_b_schema=table_b_schema, table_a=table_a, table_b=table_b ) @@ -377,9 +382,12 @@ def assertTablesEqual(self, table_a, table_b): sql ) - def assertTableRowCountsEqual(self, table_a, table_b): - table_a_result = self.run_sql('SELECT COUNT(*) FROM "{}"."{}"'.format(self.unique_schema(), table_a), fetch='one') - table_b_result = self.run_sql('SELECT COUNT(*) FROM "{}"."{}"'.format(self.unique_schema(), table_b), fetch='one') + def assertTableRowCountsEqual(self, table_a, table_b, table_a_schema=None, table_b_schema=None): + table_a_schema = self.unique_schema() if table_a_schema is None else table_a_schema + table_b_schema = self.unique_schema() if table_b_schema is None else table_b_schema + + table_a_result = self.run_sql('SELECT COUNT(*) FROM "{}"."{}"'.format(table_a_schema, table_a), fetch='one') + table_b_result = self.run_sql('SELECT COUNT(*) FROM "{}"."{}"'.format(table_b_schema, table_b), fetch='one') self.assertEquals( table_a_result[0], @@ -392,25 +400,28 @@ def assertTableRowCountsEqual(self, table_a, table_b): ) ) - def assertTableDoesNotExist(self, table): - columns = self.get_table_columns(table) + def assertTableDoesNotExist(self, table, schema=None): + columns = self.get_table_columns(table, schema) self.assertEquals( len(columns), 0 ) - def assertTableDoesExist(self, table): - columns = self.get_table_columns(table) + def assertTableDoesExist(self, table, schema=None): + columns = self.get_table_columns(table, schema) self.assertGreater( len(columns), 0 ) - def assertTableColumnsEqual(self, table_a, table_b): - table_a_result = self.get_table_columns(table_a) - table_b_result = self.get_table_columns(table_b) + def assertTableColumnsEqual(self, table_a, table_b, table_a_schema=None, table_b_schema=None): + table_a_schema = self.unique_schema() if table_a_schema is None else table_a_schema + table_b_schema = self.unique_schema() if table_b_schema is None else table_b_schema + + table_a_result = self.get_table_columns(table_a, table_a_schema) + table_b_result = self.get_table_columns(table_b, table_b_schema) self.assertEquals( table_a_result, diff --git a/test/unit/test_compiler.py b/test/unit/test_compiler.py index 8cd991b4f24..2749a8169a5 100644 --- a/test/unit/test_compiler.py +++ b/test/unit/test_compiler.py @@ -49,6 +49,7 @@ def test__prepend_ctes__already_has_cte(self): 'nodes': { 'model.root.view': { 'name': 'view', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.view', 'fqn': ['root_project', 'view'], @@ -79,6 +80,7 @@ def test__prepend_ctes__already_has_cte(self): }, 'model.root.ephemeral': { 'name': 'ephemeral', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.ephemeral', 'fqn': ['root_project', 'ephemeral'], @@ -129,6 +131,7 @@ def test__prepend_ctes__no_ctes(self): 'nodes': { 'model.root.view': { 'name': 'view', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.view', 'fqn': ['root_project', 'view'], @@ -155,6 +158,7 @@ def test__prepend_ctes__no_ctes(self): }, 'model.root.view_no_cte': { 'name': 'view_no_cte', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.view_no_cte', 'fqn': ['root_project', 'view_no_cte'], @@ -217,6 +221,7 @@ def test__prepend_ctes(self): 'nodes': { 'model.root.view': { 'name': 'view', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.view', 'fqn': ['root_project', 'view'], @@ -245,6 +250,7 @@ def test__prepend_ctes(self): }, 'model.root.ephemeral': { 'name': 'ephemeral', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.ephemeral', 'fqn': ['root_project', 'ephemeral'], @@ -301,6 +307,7 @@ def test__prepend_ctes__multiple_levels(self): 'nodes': { 'model.root.view': { 'name': 'view', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.view', 'fqn': ['root_project', 'view'], @@ -329,6 +336,7 @@ def test__prepend_ctes__multiple_levels(self): }, 'model.root.ephemeral': { 'name': 'ephemeral', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.ephemeral', 'fqn': ['root_project', 'ephemeral'], @@ -355,6 +363,7 @@ def test__prepend_ctes__multiple_levels(self): }, 'model.root.ephemeral_level_two': { 'name': 'ephemeral_level_two', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.ephemeral_level_two', 'fqn': ['root_project', 'ephemeral_level_two'], diff --git a/test/unit/test_parser.py b/test/unit/test_parser.py index b54970aeb86..d8606c2a6a8 100644 --- a/test/unit/test_parser.py +++ b/test/unit/test_parser.py @@ -32,6 +32,7 @@ def setUp(self): 'test': { 'type': 'postgres', 'host': 'localhost', + 'schema': 'analytics', } } } @@ -45,6 +46,7 @@ def setUp(self): 'test': { 'type': 'postgres', 'host': 'localhost', + 'schema': 'analytics', } } } @@ -85,6 +87,7 @@ def test__single_model(self): { 'model.root.model_one': { 'name': 'model_one', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.model_one', 'fqn': ['root', 'model_one'], @@ -142,6 +145,7 @@ def test__single_model__nested_configuration(self): { 'model.root.model_one': { 'name': 'model_one', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.model_one', 'fqn': ['root', 'nested', 'path', 'model_one'], @@ -182,6 +186,7 @@ def test__empty_model(self): { 'model.root.model_one': { 'name': 'model_one', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.model_one', 'fqn': ['root', 'model_one'], @@ -231,6 +236,7 @@ def test__simple_dependency(self): { 'model.root.base': { 'name': 'base', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.base', 'fqn': ['root', 'base'], @@ -251,6 +257,7 @@ def test__simple_dependency(self): }, 'model.root.events_tx': { 'name': 'events_tx', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.events_tx', 'fqn': ['root', 'events_tx'], @@ -328,6 +335,7 @@ def test__multiple_dependencies(self): { 'model.root.events': { 'name': 'events', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.events', 'fqn': ['root', 'events'], @@ -348,6 +356,7 @@ def test__multiple_dependencies(self): }, 'model.root.sessions': { 'name': 'sessions', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.sessions', 'fqn': ['root', 'sessions'], @@ -368,6 +377,7 @@ def test__multiple_dependencies(self): }, 'model.root.events_tx': { 'name': 'events_tx', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.events_tx', 'fqn': ['root', 'events_tx'], @@ -388,6 +398,7 @@ def test__multiple_dependencies(self): }, 'model.root.sessions_tx': { 'name': 'sessions_tx', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.sessions_tx', 'fqn': ['root', 'sessions_tx'], @@ -408,6 +419,7 @@ def test__multiple_dependencies(self): }, 'model.root.multi': { 'name': 'multi', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.multi', 'fqn': ['root', 'multi'], @@ -487,6 +499,7 @@ def test__multiple_dependencies__packages(self): { 'model.snowplow.events': { 'name': 'events', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.snowplow.events', 'fqn': ['snowplow', 'events'], @@ -507,6 +520,7 @@ def test__multiple_dependencies__packages(self): }, 'model.snowplow.sessions': { 'name': 'sessions', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.snowplow.sessions', 'fqn': ['snowplow', 'sessions'], @@ -527,6 +541,7 @@ def test__multiple_dependencies__packages(self): }, 'model.snowplow.events_tx': { 'name': 'events_tx', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.snowplow.events_tx', 'fqn': ['snowplow', 'events_tx'], @@ -547,6 +562,7 @@ def test__multiple_dependencies__packages(self): }, 'model.snowplow.sessions_tx': { 'name': 'sessions_tx', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.snowplow.sessions_tx', 'fqn': ['snowplow', 'sessions_tx'], @@ -567,6 +583,7 @@ def test__multiple_dependencies__packages(self): }, 'model.root.multi': { 'name': 'multi', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.multi', 'fqn': ['root', 'multi'], @@ -595,6 +612,7 @@ def test__process_refs__packages(self): 'nodes': { 'model.snowplow.events': { 'name': 'events', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.snowplow.events', 'fqn': ['snowplow', 'events'], @@ -614,6 +632,7 @@ def test__process_refs__packages(self): }, 'model.root.events': { 'name': 'events', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.events', 'fqn': ['root', 'events'], @@ -633,6 +652,7 @@ def test__process_refs__packages(self): }, 'model.root.dep': { 'name': 'dep', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.dep', 'fqn': ['root', 'dep'], @@ -660,6 +680,7 @@ def test__process_refs__packages(self): 'nodes': { 'model.snowplow.events': { 'name': 'events', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.snowplow.events', 'fqn': ['snowplow', 'events'], @@ -679,6 +700,7 @@ def test__process_refs__packages(self): }, 'model.root.events': { 'name': 'events', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.events', 'fqn': ['root', 'events'], @@ -698,6 +720,7 @@ def test__process_refs__packages(self): }, 'model.root.dep': { 'name': 'dep', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.dep', 'fqn': ['root', 'dep'], @@ -744,6 +767,7 @@ def test__in_model_config(self): { 'model.root.model_one': { 'name': 'model_one', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.model_one', 'fqn': ['root', 'model_one'], @@ -825,6 +849,7 @@ def test__root_project_config(self): { 'model.root.table': { 'name': 'table', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.table', 'fqn': ['root', 'table'], @@ -845,6 +870,7 @@ def test__root_project_config(self): }, 'model.root.ephemeral': { 'name': 'ephemeral', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.ephemeral', 'fqn': ['root', 'ephemeral'], @@ -865,6 +891,7 @@ def test__root_project_config(self): }, 'model.root.view': { 'name': 'view', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.view', 'fqn': ['root', 'view'], @@ -1013,6 +1040,7 @@ def test__other_project_config(self): { 'model.root.table': { 'name': 'table', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.table', 'fqn': ['root', 'table'], @@ -1033,6 +1061,7 @@ def test__other_project_config(self): }, 'model.root.ephemeral': { 'name': 'ephemeral', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.ephemeral', 'fqn': ['root', 'ephemeral'], @@ -1053,6 +1082,7 @@ def test__other_project_config(self): }, 'model.root.view': { 'name': 'view', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.view', 'fqn': ['root', 'view'], @@ -1073,6 +1103,7 @@ def test__other_project_config(self): }, 'model.snowplow.disabled': { 'name': 'disabled', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.snowplow.disabled', 'fqn': ['snowplow', 'disabled'], @@ -1093,6 +1124,7 @@ def test__other_project_config(self): }, 'model.snowplow.package': { 'name': 'package', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.snowplow.package', 'fqn': ['snowplow', 'views', 'package'], @@ -1113,6 +1145,7 @@ def test__other_project_config(self): }, 'model.snowplow.multi_sort': { 'name': 'multi_sort', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.snowplow.multi_sort', 'fqn': ['snowplow', 'views', 'multi_sort'], @@ -1164,6 +1197,7 @@ def test__simple_schema_test(self): { 'test.root.not_null_model_one_id': { 'name': 'not_null_model_one_id', + 'schema': 'analytics', 'resource_type': 'test', 'unique_id': 'test.root.not_null_model_one_id', 'fqn': ['root', 'schema_test', 'not_null_model_one_id'], @@ -1184,6 +1218,7 @@ def test__simple_schema_test(self): }, 'test.root.unique_model_one_id': { 'name': 'unique_model_one_id', + 'schema': 'analytics', 'resource_type': 'test', 'unique_id': 'test.root.unique_model_one_id', 'fqn': ['root', 'schema_test', 'unique_model_one_id'], @@ -1203,6 +1238,7 @@ def test__simple_schema_test(self): }, 'test.root.accepted_values_model_one_id__a__b': { 'name': 'accepted_values_model_one_id__a__b', + 'schema': 'analytics', 'resource_type': 'test', 'unique_id': 'test.root.accepted_values_model_one_id__a__b', # noqa 'fqn': ['root', 'schema_test', @@ -1224,6 +1260,7 @@ def test__simple_schema_test(self): }, 'test.root.relationships_model_one_id__id__ref_model_two_': { 'name': 'relationships_model_one_id__id__ref_model_two_', + 'schema': 'analytics', 'resource_type': 'test', 'unique_id': 'test.root.relationships_model_one_id__id__ref_model_two_', # noqa 'fqn': ['root', 'schema_test', @@ -1315,6 +1352,7 @@ def test__simple_data_test(self): { 'test.root.no_events': { 'name': 'no_events', + 'schema': 'analytics', 'resource_type': 'test', 'unique_id': 'test.root.no_events', 'fqn': ['root', 'no_events'], @@ -1429,6 +1467,7 @@ def test__simple_macro_used_in_model(self): { 'model.root.model_one': { 'name': 'model_one', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.model_one', 'fqn': ['root', 'model_one'], @@ -1470,6 +1509,7 @@ def test__macro_no_explicit_project_used_in_model(self): { 'model.root.model_one': { 'name': 'model_one', + 'schema': 'analytics', 'resource_type': 'model', 'unique_id': 'model.root.model_one', 'fqn': ['root', 'model_one'],