From cccbba80814cfe76831e6ca443c3682538792b37 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 21 Mar 2016 17:29:15 -0400 Subject: [PATCH 01/22] first cut --- dbt/task/compile.py | 1 - dbt/task/compiler/__init__.py | 0 dbt/task/compiler/linker.py | 69 +++++++++++++++++++++++++++++++++++ dbt/task/run.py | 16 ++++++-- requirements.txt | 2 + setup.py | 2 + 6 files changed, 86 insertions(+), 4 deletions(-) create mode 100644 dbt/task/compiler/__init__.py create mode 100644 dbt/task/compiler/linker.py diff --git a/dbt/task/compile.py b/dbt/task/compile.py index 5974066cd8f..202491b5f02 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -3,7 +3,6 @@ import fnmatch import jinja2 - class CompileTask: def __init__(self, args, project): self.args = args diff --git a/dbt/task/compiler/__init__.py b/dbt/task/compiler/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbt/task/compiler/linker.py b/dbt/task/compiler/linker.py new file mode 100644 index 00000000000..f2ad10a1a97 --- /dev/null +++ b/dbt/task/compiler/linker.py @@ -0,0 +1,69 @@ +import sqlparse +import networkx as nx + +class Linker(object): + def __init__(self, graph=None): + if graph is None: + self.graph = nx.DiGraph() + else: + self.graph = graph + + self.node_sql_map = {} + + def serialized(self): + return "\n".join(nx.generate_adjlist(self.graph)) + + @classmethod + def deserialize(self, serialized_file): + graph = nx.read_adjlist(serialized_file, create_using=nx.DiGraph()) + return Linker(graph) + + def extract_name_and_deps(self, stmt): + table_def = stmt.token_next_by_instance(0, sqlparse.sql.Identifier) + schema, tbl_or_view = table_def.get_parent_name(), table_def.get_real_name() + if schema is None or tbl_or_view is None: + print "SCHEMA: ", schema + print "TBL/VIEWL ", tbl_or_view + print "DEF: ", table_def + raise RuntimeError('schema or view not defined?') + + definition = table_def.token_next_by_instance(0, sqlparse.sql.Parenthesis) + + node = (schema, tbl_or_view) + + i = 0 + token = definition.token_next_by_instance(0, sqlparse.sql.Identifier) + while token is not None: + new_node = (token.get_parent_name(), token.get_real_name()) + # not great -- our parser doesn't differentiate between SELECTed fields and tables + if None not in new_node: + self.graph.add_node(new_node) + self.graph.add_edge(node, new_node) + + i = definition.token_index(token) + 1 + token = definition.token_next_by_instance(i, sqlparse.sql.Identifier) + + return node + + def as_dependency_list(self): + order = nx.topological_sort(self.graph, reverse=True) + for node in order: + if node in self.node_sql_map: # TODO : check against db??? or what? + yield (node, self.node_sql_map[node]) + else: + print "Skipping {}".format(node) + + def register(self, node, sql): + if node in self.node_sql_map: + raise RuntimeError("multiple declarations of node: {}".format(node)) + self.node_sql_map[node] = sql + + def link(self, sql): + sql = sql.strip() + for statement in sqlparse.parse(sql): + if statement.get_type() == 'CREATE': + print "DEBUG: ERROR: Use CREATE OR REPLACE instead of CREATE!" + elif statement.get_type() == 'CREATE OR REPLACE': + node = self.extract_name_and_deps(statement) + self.register(node, sql) + diff --git a/dbt/task/run.py b/dbt/task/run.py index acd79f498b4..47cf1ee66cb 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -3,6 +3,7 @@ import os import fnmatch +from compiler.linker import Linker class RedshiftTarget: def __init__(self, cfg): @@ -32,6 +33,8 @@ def __init__(self, args, project): self.args = args self.project = project + self.linker = Linker() + def __compiled_files(self): compiled_files = [] sql_path = self.project['target-path'] @@ -59,15 +62,22 @@ def __create_schema(self): with handle.cursor() as cursor: cursor.execute('create schema if not exists "{}"'.format(target_cfg['schema'])) + def __load_models(self): + target = self.__get_target() + for f in self.__compiled_files(): + with open(os.path.join(self.project['target-path'], f), 'r') as fh: + self.linker.link(fh.read()) + def __execute_models(self): target = self.__get_target() with target.get_handle() as handle: with handle.cursor() as cursor: - for f in self.__compiled_files(): - with open(os.path.join(self.project['target-path'], f), 'r') as fh: - cursor.execute(fh.read()) + for (node, sql) in self.linker.as_dependency_list(): + cursor.execute(sql) print " {}".format(cursor.statusmessage) def run(self): self.__create_schema() + self.__load_models() self.__execute_models() + diff --git a/requirements.txt b/requirements.txt index bb604da014c..210bea51108 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,5 @@ argparse Jinja2>=2.8 PyYAML>=3.11 psycopg2==2.6.1 +sqlparse==0.1.19 +networkx==1.11 diff --git a/setup.py b/setup.py index 1d0d30a00ab..deb59fbc7e3 100644 --- a/setup.py +++ b/setup.py @@ -17,5 +17,7 @@ 'Jinja2>=2.8', 'PyYAML>=3.11', 'psycopg2==2.6.1', + 'sqlparse==0.1.19', + 'networkx==1.11', ], ) From 04b0d5e987c08b9467940bde4c226ad16e913d7d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Fri, 25 Mar 2016 16:36:09 -0400 Subject: [PATCH 02/22] wrap select stmts in create or replace (table|view) --- dbt/project.py | 3 +++ dbt/task/compile.py | 16 +++++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/dbt/project.py b/dbt/project.py index b2f6eade325..b29dd184913 100644 --- a/dbt/project.py +++ b/dbt/project.py @@ -46,6 +46,9 @@ def __contains__(self, key): def __setitem__(self, key, value): return self.cfg.__setitem__(key, value) + def get(self, key, default=None): + return self.cfg.get(key, default) + def run_environment(self): target_name = self.cfg['run-target'] return self.cfg['outputs'][target_name] diff --git a/dbt/task/compile.py b/dbt/task/compile.py index 7d87896bede..a349ce7b223 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -51,12 +51,26 @@ def __write(self, path, payload): with open(target_path, 'w') as f: f.write(payload) + def __wrap_in_create(self, path, query): + filename = os.path.basename(path) + identifier, ext = os.path.splitext(filename) + + # default to view if not provided in config! + table_or_view = self.project.get('table_or_view', 'view') + + ctx = self.project.context() + schema = ctx['env']['schema'] + + template = "create or replace {table_or_view} {schema}.{identifier} as ( {query} );" + return template.format(table_or_view=table_or_view, schema=schema, identifier=identifier, query=query) + def __compile(self, src_index): for src_path, files in src_index.iteritems(): jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=src_path)) for f in files: template = jinja.get_template(f) - self.__write(f, template.render(self.project.context())) + rendered = template.render(self.project.context()) + self.__write(f, self.__wrap_in_create(f, rendered)) def run(self): src_index = self.__src_index() From c0d5295cb243a2cb474f0e60f43e9a9d3de0e495 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 28 Mar 2016 10:36:05 -0400 Subject: [PATCH 03/22] better parsing and drop schema before create --- dbt/task/compile.py | 8 ++++++- dbt/task/compiler/linker.py | 43 +++++++++++++++++++++++-------------- dbt/task/run.py | 11 +++++++++- 3 files changed, 44 insertions(+), 18 deletions(-) diff --git a/dbt/task/compile.py b/dbt/task/compile.py index 65401216ccc..00c468525d6 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -60,7 +60,13 @@ def __wrap_in_create(self, path, query): ctx = self.project.context() schema = ctx['env']['schema'] - template = "create or replace {table_or_view} {schema}.{identifier} as ( {query} );" + if table_or_view == 'table': + template = "create {table_or_view} {schema}.{identifier} as ( {query} );" + elif table_or_view == 'view': + template = "create view {table_or_view} {schema}.{identifier} as ( {query} );" + else: + raise RuntimeError("bad value for `table_or_view`: {}".format(table_or_view)) + return template.format(table_or_view=table_or_view, schema=schema, identifier=identifier, query=query) def __compile(self, src_index): diff --git a/dbt/task/compiler/linker.py b/dbt/task/compiler/linker.py index f2ad10a1a97..e55a488e422 100644 --- a/dbt/task/compiler/linker.py +++ b/dbt/task/compiler/linker.py @@ -29,29 +29,40 @@ def extract_name_and_deps(self, stmt): definition = table_def.token_next_by_instance(0, sqlparse.sql.Parenthesis) - node = (schema, tbl_or_view) + definition_node = (schema, tbl_or_view) - i = 0 - token = definition.token_next_by_instance(0, sqlparse.sql.Identifier) - while token is not None: - new_node = (token.get_parent_name(), token.get_real_name()) - # not great -- our parser doesn't differentiate between SELECTed fields and tables - if None not in new_node: - self.graph.add_node(new_node) - self.graph.add_edge(node, new_node) + local_defs = set() - i = definition.token_index(token) + 1 - token = definition.token_next_by_instance(i, sqlparse.sql.Identifier) + def extract_deps(stmt): + token = stmt.token_first() + while token is not None: + if type(token) != sqlparse.sql.IdentifierList and token.is_group(): + local_defs.add(token.get_name()) + extract_deps(token) - return node + if type(token) == sqlparse.sql.Identifier and token.get_parent_name() not in local_defs: + new_node = (token.get_parent_name(), token.get_real_name()) + if None not in new_node: + self.graph.add_node(new_node) + self.graph.add_edge(definition_node, new_node) + + index = stmt.token_index(token) + token = stmt.token_next(index) + + extract_deps(definition) + return definition_node def as_dependency_list(self): order = nx.topological_sort(self.graph, reverse=True) for node in order: + #print "{}.{}".format(node[0], node[1]) if node in self.node_sql_map: # TODO : check against db??? or what? + #for (schema, tbl_or_view) in self.graph[node]: + # print " {}.{}".format(schema, tbl_or_view) yield (node, self.node_sql_map[node]) else: - print "Skipping {}".format(node) + #print "Skipping {}".format(node) + pass def register(self, node, sql): if node in self.node_sql_map: @@ -61,9 +72,9 @@ def register(self, node, sql): def link(self, sql): sql = sql.strip() for statement in sqlparse.parse(sql): - if statement.get_type() == 'CREATE': - print "DEBUG: ERROR: Use CREATE OR REPLACE instead of CREATE!" - elif statement.get_type() == 'CREATE OR REPLACE': + if statement.get_type().startswith('CREATE'): node = self.extract_name_and_deps(statement) self.register(node, sql) + else: + print "Ignoring {}".format(sql[0:100].replace('\n', ' ')) diff --git a/dbt/task/run.py b/dbt/task/run.py index 47cf1ee66cb..4de952208be 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -2,6 +2,7 @@ import psycopg2 import os import fnmatch +import re from compiler.linker import Linker @@ -60,7 +61,8 @@ def __create_schema(self): target = self.__get_target() with target.get_handle() as handle: with handle.cursor() as cursor: - cursor.execute('create schema if not exists "{}"'.format(target_cfg['schema'])) + cursor.execute('drop schema if exists "{}" cascade'.format(target_cfg['schema'])) + cursor.execute('create schema "{}"'.format(target_cfg['schema'])) def __load_models(self): target = self.__get_target() @@ -68,13 +70,20 @@ def __load_models(self): with open(os.path.join(self.project['target-path'], f), 'r') as fh: self.linker.link(fh.read()) + def __drop_if_needed(self, node): + schema, tbl = node + def __execute_models(self): target = self.__get_target() with target.get_handle() as handle: with handle.cursor() as cursor: for (node, sql) in self.linker.as_dependency_list(): + self.__drop_if_needed(node) + print "creating {}".format(".".join(node)) + #print " {}...".format(re.sub( '\s+', ' ', sql[0:100] ).strip()) cursor.execute(sql) print " {}".format(cursor.statusmessage) + handle.commit() def run(self): self.__create_schema() From 7649f6eeb2772b8c68526419e3d7c657d93f63c6 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 29 Mar 2016 15:13:59 -0400 Subject: [PATCH 04/22] handle CTEs and window functions --- dbt/task/compiler/linker.py | 21 ++++++++++++++++----- dbt/task/run.py | 2 ++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/dbt/task/compiler/linker.py b/dbt/task/compiler/linker.py index e55a488e422..d308ebd364c 100644 --- a/dbt/task/compiler/linker.py +++ b/dbt/task/compiler/linker.py @@ -32,24 +32,35 @@ def extract_name_and_deps(self, stmt): definition_node = (schema, tbl_or_view) local_defs = set() + new_nodes = set() def extract_deps(stmt): token = stmt.token_first() while token is not None: - if type(token) != sqlparse.sql.IdentifierList and token.is_group(): + excluded_types = [sqlparse.sql.Function] # don't dive into window functions + if type(token) not in excluded_types and token.is_group(): + # this is a thing that has a name -- note that! local_defs.add(token.get_name()) + # recurse into the group extract_deps(token) - if type(token) == sqlparse.sql.Identifier and token.get_parent_name() not in local_defs: + if type(token) == sqlparse.sql.Identifier: new_node = (token.get_parent_name(), token.get_real_name()) - if None not in new_node: - self.graph.add_node(new_node) - self.graph.add_edge(definition_node, new_node) + if None not in new_node: # hack for now -- make sure it's qualified w/ schema + # don't add edges yet! + new_nodes.add(new_node) index = stmt.token_index(token) token = stmt.token_next(index) extract_deps(definition) + + # only add nodes which don't reference locally defined constructs + for new_node in new_nodes: + if new_node[0] not in local_defs: + self.graph.add_node(new_node) + self.graph.add_edge(definition_node, new_node) + return definition_node def as_dependency_list(self): diff --git a/dbt/task/run.py b/dbt/task/run.py index 4de952208be..3b40b247c63 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -78,6 +78,8 @@ def __execute_models(self): with target.get_handle() as handle: with handle.cursor() as cursor: for (node, sql) in self.linker.as_dependency_list(): + print node, self.linker.graph[node] + continue self.__drop_if_needed(node) print "creating {}".format(".".join(node)) #print " {}...".format(re.sub( '\s+', ' ', sql[0:100] ).strip()) From ce530a7adf315164dbe826a59773119f909b9117 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 29 Mar 2016 15:14:39 -0400 Subject: [PATCH 05/22] remove debug code --- dbt/task/run.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dbt/task/run.py b/dbt/task/run.py index 3b40b247c63..4de952208be 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -78,8 +78,6 @@ def __execute_models(self): with target.get_handle() as handle: with handle.cursor() as cursor: for (node, sql) in self.linker.as_dependency_list(): - print node, self.linker.graph[node] - continue self.__drop_if_needed(node) print "creating {}".format(".".join(node)) #print " {}...".format(re.sub( '\s+', ' ', sql[0:100] ).strip()) From fa9e4c6f9b09e90ce1f891215d2fd0b9b089c24d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 29 Mar 2016 15:50:24 -0400 Subject: [PATCH 06/22] fix duplicate 'view' keyword in compile stmt --- dbt/task/compile.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dbt/task/compile.py b/dbt/task/compile.py index 00c468525d6..ea7e17d676d 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -60,10 +60,8 @@ def __wrap_in_create(self, path, query): ctx = self.project.context() schema = ctx['env']['schema'] - if table_or_view == 'table': + if table_or_view in ['table', 'view']: template = "create {table_or_view} {schema}.{identifier} as ( {query} );" - elif table_or_view == 'view': - template = "create view {table_or_view} {schema}.{identifier} as ( {query} );" else: raise RuntimeError("bad value for `table_or_view`: {}".format(table_or_view)) From d8d9e250d709039cc5eaac29a7121727f886fde1 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 29 Mar 2016 16:20:20 -0400 Subject: [PATCH 07/22] code cleanup --- dbt/task/compiler/linker.py | 36 +++++++++++++++++++++++++++--------- dbt/task/run.py | 8 ++------ 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/dbt/task/compiler/linker.py b/dbt/task/compiler/linker.py index d308ebd364c..4bbc6dcf487 100644 --- a/dbt/task/compiler/linker.py +++ b/dbt/task/compiler/linker.py @@ -1,6 +1,24 @@ import sqlparse import networkx as nx +class Relation(object): + def __init__(self, schema, name): + self.schema = schema + self.name = name + + def valid(self): + return None not in (self.schema, self.name) + + @property + def val(self): + return "{}.{}".format(self.schema, self.name) + + def __repr__(self): + return self.val() + + def __str__(self): + return self.val() + class Linker(object): def __init__(self, graph=None): if graph is None: @@ -29,7 +47,7 @@ def extract_name_and_deps(self, stmt): definition = table_def.token_next_by_instance(0, sqlparse.sql.Parenthesis) - definition_node = (schema, tbl_or_view) + definition_node = Relation(schema, tbl_or_view) local_defs = set() new_nodes = set() @@ -45,10 +63,10 @@ def extract_deps(stmt): extract_deps(token) if type(token) == sqlparse.sql.Identifier: - new_node = (token.get_parent_name(), token.get_real_name()) - if None not in new_node: # hack for now -- make sure it's qualified w/ schema - # don't add edges yet! - new_nodes.add(new_node) + new_node = Relation(token.get_parent_name(), token.get_real_name()) + + if new_node.valid(): + new_nodes.add(new_node) # don't add edges yet! index = stmt.token_index(token) token = stmt.token_next(index) @@ -57,11 +75,11 @@ def extract_deps(stmt): # only add nodes which don't reference locally defined constructs for new_node in new_nodes: - if new_node[0] not in local_defs: - self.graph.add_node(new_node) - self.graph.add_edge(definition_node, new_node) + if new_node.schema not in local_defs: + self.graph.add_node(new_node.val) + self.graph.add_edge(definition_node.val, new_node.val) - return definition_node + return definition_node.val def as_dependency_list(self): order = nx.topological_sort(self.graph, reverse=True) diff --git a/dbt/task/run.py b/dbt/task/run.py index 4de952208be..860dc737212 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -70,16 +70,12 @@ def __load_models(self): with open(os.path.join(self.project['target-path'], f), 'r') as fh: self.linker.link(fh.read()) - def __drop_if_needed(self, node): - schema, tbl = node - def __execute_models(self): target = self.__get_target() with target.get_handle() as handle: with handle.cursor() as cursor: - for (node, sql) in self.linker.as_dependency_list(): - self.__drop_if_needed(node) - print "creating {}".format(".".join(node)) + for (relation, sql) in self.linker.as_dependency_list(): + print "creating {}".format(relation) #print " {}...".format(re.sub( '\s+', ' ', sql[0:100] ).strip()) cursor.execute(sql) print " {}".format(cursor.statusmessage) From 64bb3dbfed92580ebb3ff66ac487d6d3912e4787 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 29 Mar 2016 16:42:46 -0400 Subject: [PATCH 08/22] default to models --- dbt/project.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/project.py b/dbt/project.py index b29dd184913..5eae5617e4b 100644 --- a/dbt/project.py +++ b/dbt/project.py @@ -4,7 +4,7 @@ import copy default_project_cfg = { - 'source-paths': ['model'], + 'source-paths': ['models'], 'test-paths': ['test'], 'target-path': 'target', 'clean-targets': ['target'], From cf588eda2e4dbaebe6cae4e3ed9b8cfeb022dd68 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 29 Mar 2016 19:00:08 -0400 Subject: [PATCH 09/22] each model gets its own config --- dbt/task/compile.py | 78 ++++++++++++++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 25 deletions(-) diff --git a/dbt/task/compile.py b/dbt/task/compile.py index ea7e17d676d..9a40af63638 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -2,40 +2,50 @@ import os import fnmatch import jinja2 +import yaml +from collections import defaultdict + +default_model_config = { + "materialized": False, + "enabled": True, +} class CompileTask: def __init__(self, args, project): self.args = args self.project = project - def __is_specified_model(self, path): - if 'models' not in self.project: - return True + self.model_configs = {} + + def __load_model_config(self, config_path): + # directory containing the config file + model_path = os.path.dirname(config_path) + + if model_path not in self.model_configs and os.path.exists(config_path): + with open(config_path, 'r') as config_fh: + model_config = yaml.safe_load(config_fh) + + config = default_model_config.copy() + config.update(model_config) - path_parts = path.split("/") - if len(path_parts) < 2: - return False - else: - model = path_parts[1] - for allowed_model in self.project['models']: - if fnmatch.fnmatch(model, allowed_model): - return True - return False + self.model_configs[model_path] = config def __src_index(self): """returns: {'model': ['pardot/model.sql', 'segment/model.sql']} """ - indexed_files = {} + indexed_files = defaultdict(list) for source_path in self.project['source-paths']: for root, dirs, files in os.walk(source_path): - if not self.__is_specified_model(root): - continue for filename in files: + abs_path = os.path.join(root, filename) + rel_path = os.path.relpath(abs_path, source_path) + if fnmatch.fnmatch(filename, "*.sql"): - abs_path = os.path.join(root, filename) - rel_path = os.path.relpath(abs_path, source_path) - indexed_files.setdefault(source_path, []).append(rel_path) + indexed_files[source_path].append(rel_path) + + elif filename == 'config.yml': + self.__load_model_config(abs_path) return indexed_files @@ -50,30 +60,48 @@ def __write(self, path, payload): with open(target_path, 'w') as f: f.write(payload) - def __wrap_in_create(self, path, query): + def __wrap_in_create(self, path, query, model_config): filename = os.path.basename(path) identifier, ext = os.path.splitext(filename) # default to view if not provided in config! - table_or_view = self.project.get('table_or_view', 'view') + table_or_view = 'table' if model_config['materialized'] else 'view' ctx = self.project.context() schema = ctx['env']['schema'] - if table_or_view in ['table', 'view']: - template = "create {table_or_view} {schema}.{identifier} as ( {query} );" - else: - raise RuntimeError("bad value for `table_or_view`: {}".format(table_or_view)) + template = "create {table_or_view} {schema}.{identifier} as ( {query} );" return template.format(table_or_view=table_or_view, schema=schema, identifier=identifier, query=query) + + def __get_sql_file_config(self, src_path, f): + model_path = os.path.join(src_path, os.path.dirname(f)) + config = self.model_configs.get(model_path, default_model_config) + identifier, ext = os.path.splitext(os.path.basename(f)) + model_config = config.copy() + + if identifier in model_config: + model_config.update(config[identifier]) + + return model_config + def __compile(self, src_index): for src_path, files in src_index.iteritems(): jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=src_path)) for f in files: template = jinja.get_template(f) rendered = template.render(self.project.context()) - self.__write(f, self.__wrap_in_create(f, rendered)) + + model_config = self.__get_sql_file_config(src_path, f) + + if not model_config['enabled']: + continue + + create_stmt = self.__wrap_in_create(f, rendered, model_config) + + if create_stmt: + self.__write(f, create_stmt) def run(self): src_index = self.__src_index() From b3c19da00b69177c5f261ee44e47514dea3c32b0 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 29 Mar 2016 19:05:33 -0400 Subject: [PATCH 10/22] cleanup --- dbt/task/compiler/linker.py | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/dbt/task/compiler/linker.py b/dbt/task/compiler/linker.py index 4bbc6dcf487..c7b0972c0a4 100644 --- a/dbt/task/compiler/linker.py +++ b/dbt/task/compiler/linker.py @@ -28,21 +28,10 @@ def __init__(self, graph=None): self.node_sql_map = {} - def serialized(self): - return "\n".join(nx.generate_adjlist(self.graph)) - - @classmethod - def deserialize(self, serialized_file): - graph = nx.read_adjlist(serialized_file, create_using=nx.DiGraph()) - return Linker(graph) - def extract_name_and_deps(self, stmt): table_def = stmt.token_next_by_instance(0, sqlparse.sql.Identifier) schema, tbl_or_view = table_def.get_parent_name(), table_def.get_real_name() if schema is None or tbl_or_view is None: - print "SCHEMA: ", schema - print "TBL/VIEWL ", tbl_or_view - print "DEF: ", table_def raise RuntimeError('schema or view not defined?') definition = table_def.token_next_by_instance(0, sqlparse.sql.Parenthesis) @@ -84,13 +73,9 @@ def extract_deps(stmt): def as_dependency_list(self): order = nx.topological_sort(self.graph, reverse=True) for node in order: - #print "{}.{}".format(node[0], node[1]) - if node in self.node_sql_map: # TODO : check against db??? or what? - #for (schema, tbl_or_view) in self.graph[node]: - # print " {}.{}".format(schema, tbl_or_view) + if node in self.node_sql_map: # TODO : yield (node, self.node_sql_map[node]) else: - #print "Skipping {}".format(node) pass def register(self, node, sql): @@ -106,4 +91,3 @@ def link(self, sql): self.register(node, sql) else: print "Ignoring {}".format(sql[0:100].replace('\n', ' ')) - From a90d3a1a2b4151318bdcd00b011eec850d0a4d58 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 31 Mar 2016 14:28:56 -0400 Subject: [PATCH 11/22] rename compiler --> linker --- dbt/task/{compiler => build}/__init__.py | 0 dbt/task/{compiler => build}/linker.py | 0 dbt/task/run.py | 2 +- 3 files changed, 1 insertion(+), 1 deletion(-) rename dbt/task/{compiler => build}/__init__.py (100%) rename dbt/task/{compiler => build}/linker.py (100%) diff --git a/dbt/task/compiler/__init__.py b/dbt/task/build/__init__.py similarity index 100% rename from dbt/task/compiler/__init__.py rename to dbt/task/build/__init__.py diff --git a/dbt/task/compiler/linker.py b/dbt/task/build/linker.py similarity index 100% rename from dbt/task/compiler/linker.py rename to dbt/task/build/linker.py diff --git a/dbt/task/run.py b/dbt/task/run.py index 860dc737212..016ded701a3 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -4,7 +4,7 @@ import fnmatch import re -from compiler.linker import Linker +from build.linker import Linker class RedshiftTarget: def __init__(self, cfg): From 79f6fb5b76381fe13bb05e2c22de2e5602c26387 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 31 Mar 2016 15:29:57 -0400 Subject: [PATCH 12/22] drop existing tables/views (instead of whole schema) --- dbt/task/build/linker.py | 4 ++-- dbt/task/compile.py | 11 +++++++++-- dbt/task/run.py | 30 ++++++++++++++++++++++++++++-- 3 files changed, 39 insertions(+), 6 deletions(-) diff --git a/dbt/task/build/linker.py b/dbt/task/build/linker.py index c7b0972c0a4..21daab53faf 100644 --- a/dbt/task/build/linker.py +++ b/dbt/task/build/linker.py @@ -14,10 +14,10 @@ def val(self): return "{}.{}".format(self.schema, self.name) def __repr__(self): - return self.val() + return self.val def __str__(self): - return self.val() + return self.val class Linker(object): def __init__(self, graph=None): diff --git a/dbt/task/compile.py b/dbt/task/compile.py index 9a40af63638..4897343e3b1 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -70,9 +70,16 @@ def __wrap_in_create(self, path, query, model_config): ctx = self.project.context() schema = ctx['env']['schema'] - template = "create {table_or_view} {schema}.{identifier} as ( {query} );" + create_template = "create {table_or_view} {schema}.{identifier} as ( {query} );" - return template.format(table_or_view=table_or_view, schema=schema, identifier=identifier, query=query) + opts = { + "table_or_view": table_or_view, + "schema": schema, + "identifier": identifier, + "query": query + } + + return create_template.format(**opts) def __get_sql_file_config(self, src_path, f): diff --git a/dbt/task/run.py b/dbt/task/run.py index 016ded701a3..f7f6ebe6566 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -61,8 +61,7 @@ def __create_schema(self): target = self.__get_target() with target.get_handle() as handle: with handle.cursor() as cursor: - cursor.execute('drop schema if exists "{}" cascade'.format(target_cfg['schema'])) - cursor.execute('create schema "{}"'.format(target_cfg['schema'])) + cursor.execute('create schema if not exists "{}"'.format(target_cfg['schema'])) def __load_models(self): target = self.__get_target() @@ -70,11 +69,38 @@ def __load_models(self): with open(os.path.join(self.project['target-path'], f), 'r') as fh: self.linker.link(fh.read()) + def __query_for_existing(self, cursor, schema): + sql = """ + select '{schema}.' || tablename as name, 'table' as type from pg_tables where schemaname = '{schema}' + union all + select '{schema}.' || viewname as name, 'view' as type from pg_views where schemaname = '{schema}' """.format(schema=schema) + + cursor.execute(sql) + existing = [(name, relation_type) for (name, relation_type) in cursor.fetchall()] + + return dict(existing) + + def __drop(self, cursor, relation, relation_type): + cascade = "cascade" if relation_type == 'view' else "" + + sql = "drop {relation_type} if exists {relation} {cascade}".format(relation_type=relation_type, relation=relation, cascade=cascade) + + cursor.execute(sql) + def __execute_models(self): target = self.__get_target() + with target.get_handle() as handle: with handle.cursor() as cursor: + + existing = self.__query_for_existing(cursor, target.schema); + for (relation, sql) in self.linker.as_dependency_list(): + + if relation in existing: + self.__drop(cursor, relation, existing[relation]) + handle.commit() + print "creating {}".format(relation) #print " {}...".format(re.sub( '\s+', ' ', sql[0:100] ).strip()) cursor.execute(sql) From c04131728b9cc32a62857d90069dc40c6d4056d4 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 31 Mar 2016 15:46:43 -0400 Subject: [PATCH 13/22] remove argparse dep --- setup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.py b/setup.py index deb59fbc7e3..a15f4c0afeb 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,6 @@ 'scripts/dbt', ], install_requires=[ - 'argparse>=1.2.1', 'Jinja2>=2.8', 'PyYAML>=3.11', 'psycopg2==2.6.1', From be1231a9af7cadfd07c4bac8959a0d56c5b4f3cb Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 31 Mar 2016 15:54:33 -0400 Subject: [PATCH 14/22] change print statements, function calls, imports --- dbt/task/build/linker.py | 2 +- dbt/task/compile.py | 4 ++-- dbt/task/debug.py | 4 ++-- dbt/task/run.py | 8 ++++---- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dbt/task/build/linker.py b/dbt/task/build/linker.py index 21daab53faf..c193d8d0d64 100644 --- a/dbt/task/build/linker.py +++ b/dbt/task/build/linker.py @@ -90,4 +90,4 @@ def link(self, sql): node = self.extract_name_and_deps(statement) self.register(node, sql) else: - print "Ignoring {}".format(sql[0:100].replace('\n', ' ')) + print("Ignoring {}".format(sql[0:100].replace('\n', ' '))) diff --git a/dbt/task/compile.py b/dbt/task/compile.py index 4897343e3b1..ea1c0831cc0 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -55,7 +55,7 @@ def __write(self, path, payload): if not os.path.exists(os.path.dirname(target_path)): os.makedirs(os.path.dirname(target_path)) elif os.path.exists(target_path): - print "Compiler overwrite of {}".format(target_path) + print("Compiler overwrite of {}".format(target_path)) with open(target_path, 'w') as f: f.write(payload) @@ -94,7 +94,7 @@ def __get_sql_file_config(self, src_path, f): return model_config def __compile(self, src_index): - for src_path, files in src_index.iteritems(): + for src_path, files in src_index.items(): jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=src_path)) for f in files: template = jinja.get_template(f) diff --git a/dbt/task/debug.py b/dbt/task/debug.py index b3e876d61ad..45f5b56117f 100644 --- a/dbt/task/debug.py +++ b/dbt/task/debug.py @@ -7,6 +7,6 @@ def __init__(self, args, project): self.project = project def run(self): - print "args: {}".format(self.args) - print "project: " + print("args: {}".format(self.args)) + print("project: ") pprint.pprint(self.project) diff --git a/dbt/task/run.py b/dbt/task/run.py index f7f6ebe6566..eedd438e45f 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -4,7 +4,7 @@ import fnmatch import re -from build.linker import Linker +from .build.linker import Linker class RedshiftTarget: def __init__(self, cfg): @@ -101,10 +101,10 @@ def __execute_models(self): self.__drop(cursor, relation, existing[relation]) handle.commit() - print "creating {}".format(relation) - #print " {}...".format(re.sub( '\s+', ' ', sql[0:100] ).strip()) + print("creating {}".format(relation)) + #print(" {}...".format(re.sub( '\s+', ' ', sql[0:100] ).strip())) cursor.execute(sql) - print " {}".format(cursor.statusmessage) + print(" {}".format(cursor.statusmessage)) handle.commit() def run(self): From 7faac11d9dc9676d50e029a592571d5bbcfade65 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 31 Mar 2016 15:55:57 -0400 Subject: [PATCH 15/22] always cascade table drop --- dbt/task/run.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dbt/task/run.py b/dbt/task/run.py index f7f6ebe6566..b39ec203063 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -81,10 +81,7 @@ def __query_for_existing(self, cursor, schema): return dict(existing) def __drop(self, cursor, relation, relation_type): - cascade = "cascade" if relation_type == 'view' else "" - - sql = "drop {relation_type} if exists {relation} {cascade}".format(relation_type=relation_type, relation=relation, cascade=cascade) - + sql = "drop {relation_type} if exists {relation} cascade".format(relation_type=relation_type, relation=relation) cursor.execute(sql) def __execute_models(self): From 216b2f8aa65f4cf5b4a8ba6dbff974e1258d6507 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Fri, 1 Apr 2016 13:37:38 -0400 Subject: [PATCH 16/22] move linker code into run task --- dbt/task/build/__init__.py | 0 dbt/task/build/linker.py | 93 ------------------------------------- dbt/task/run.py | 95 +++++++++++++++++++++++++++++++++++++- 3 files changed, 94 insertions(+), 94 deletions(-) delete mode 100644 dbt/task/build/__init__.py delete mode 100644 dbt/task/build/linker.py diff --git a/dbt/task/build/__init__.py b/dbt/task/build/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/dbt/task/build/linker.py b/dbt/task/build/linker.py deleted file mode 100644 index 21daab53faf..00000000000 --- a/dbt/task/build/linker.py +++ /dev/null @@ -1,93 +0,0 @@ -import sqlparse -import networkx as nx - -class Relation(object): - def __init__(self, schema, name): - self.schema = schema - self.name = name - - def valid(self): - return None not in (self.schema, self.name) - - @property - def val(self): - return "{}.{}".format(self.schema, self.name) - - def __repr__(self): - return self.val - - def __str__(self): - return self.val - -class Linker(object): - def __init__(self, graph=None): - if graph is None: - self.graph = nx.DiGraph() - else: - self.graph = graph - - self.node_sql_map = {} - - def extract_name_and_deps(self, stmt): - table_def = stmt.token_next_by_instance(0, sqlparse.sql.Identifier) - schema, tbl_or_view = table_def.get_parent_name(), table_def.get_real_name() - if schema is None or tbl_or_view is None: - raise RuntimeError('schema or view not defined?') - - definition = table_def.token_next_by_instance(0, sqlparse.sql.Parenthesis) - - definition_node = Relation(schema, tbl_or_view) - - local_defs = set() - new_nodes = set() - - def extract_deps(stmt): - token = stmt.token_first() - while token is not None: - excluded_types = [sqlparse.sql.Function] # don't dive into window functions - if type(token) not in excluded_types and token.is_group(): - # this is a thing that has a name -- note that! - local_defs.add(token.get_name()) - # recurse into the group - extract_deps(token) - - if type(token) == sqlparse.sql.Identifier: - new_node = Relation(token.get_parent_name(), token.get_real_name()) - - if new_node.valid(): - new_nodes.add(new_node) # don't add edges yet! - - index = stmt.token_index(token) - token = stmt.token_next(index) - - extract_deps(definition) - - # only add nodes which don't reference locally defined constructs - for new_node in new_nodes: - if new_node.schema not in local_defs: - self.graph.add_node(new_node.val) - self.graph.add_edge(definition_node.val, new_node.val) - - return definition_node.val - - def as_dependency_list(self): - order = nx.topological_sort(self.graph, reverse=True) - for node in order: - if node in self.node_sql_map: # TODO : - yield (node, self.node_sql_map[node]) - else: - pass - - def register(self, node, sql): - if node in self.node_sql_map: - raise RuntimeError("multiple declarations of node: {}".format(node)) - self.node_sql_map[node] = sql - - def link(self, sql): - sql = sql.strip() - for statement in sqlparse.parse(sql): - if statement.get_type().startswith('CREATE'): - node = self.extract_name_and_deps(statement) - self.register(node, sql) - else: - print "Ignoring {}".format(sql[0:100].replace('\n', ' ')) diff --git a/dbt/task/run.py b/dbt/task/run.py index b39ec203063..434d93f14b6 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -4,7 +4,8 @@ import fnmatch import re -from build.linker import Linker +import sqlparse +import networkx as nx class RedshiftTarget: def __init__(self, cfg): @@ -29,6 +30,98 @@ def get_handle(self): return psycopg2.connect(self.__get_spec()) +class Relation(object): + def __init__(self, schema, name): + self.schema = schema + self.name = name + + def valid(self): + return None not in (self.schema, self.name) + + @property + def val(self): + return "{}.{}".format(self.schema, self.name) + + def __repr__(self): + return self.val + + def __str__(self): + return self.val + +class Linker(object): + def __init__(self, graph=None): + if graph is None: + self.graph = nx.DiGraph() + else: + self.graph = graph + + self.node_sql_map = {} + + def extract_name_and_deps(self, stmt): + table_def = stmt.token_next_by_instance(0, sqlparse.sql.Identifier) + schema, tbl_or_view = table_def.get_parent_name(), table_def.get_real_name() + if schema is None or tbl_or_view is None: + raise RuntimeError('schema or view not defined?') + + definition = table_def.token_next_by_instance(0, sqlparse.sql.Parenthesis) + + definition_node = Relation(schema, tbl_or_view) + + local_defs = set() + new_nodes = set() + + def extract_deps(stmt): + token = stmt.token_first() + while token is not None: + excluded_types = [sqlparse.sql.Function] # don't dive into window functions + if type(token) not in excluded_types and token.is_group(): + # this is a thing that has a name -- note that! + local_defs.add(token.get_name()) + # recurse into the group + extract_deps(token) + + if type(token) == sqlparse.sql.Identifier: + new_node = Relation(token.get_parent_name(), token.get_real_name()) + + if new_node.valid(): + new_nodes.add(new_node) # don't add edges yet! + + index = stmt.token_index(token) + token = stmt.token_next(index) + + extract_deps(definition) + + # only add nodes which don't reference locally defined constructs + for new_node in new_nodes: + if new_node.schema not in local_defs: + self.graph.add_node(new_node.val) + self.graph.add_edge(definition_node.val, new_node.val) + + return definition_node.val + + def as_dependency_list(self): + order = nx.topological_sort(self.graph, reverse=True) + for node in order: + if node in self.node_sql_map: # TODO : + yield (node, self.node_sql_map[node]) + else: + pass + + def register(self, node, sql): + if node in self.node_sql_map: + raise RuntimeError("multiple declarations of node: {}".format(node)) + self.node_sql_map[node] = sql + + def link(self, sql): + sql = sql.strip() + for statement in sqlparse.parse(sql): + if statement.get_type().startswith('CREATE'): + node = self.extract_name_and_deps(statement) + self.register(node, sql) + else: + print "Ignoring {}".format(sql[0:100].replace('\n', ' ')) + + class RunTask: def __init__(self, args, project): self.args = args From 6e4c858e02ebefc56d8397654723c9306d3f20e3 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Fri, 1 Apr 2016 13:40:19 -0400 Subject: [PATCH 17/22] add parens to print stmt --- dbt/task/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/task/run.py b/dbt/task/run.py index 5e569f58385..ec8737c19c5 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -119,7 +119,7 @@ def link(self, sql): node = self.extract_name_and_deps(statement) self.register(node, sql) else: - print "Ignoring {}".format(sql[0:100].replace('\n', ' ')) + print("Ignoring {}".format(sql[0:100].replace('\n', ' '))) class RunTask: From 42d500af8a0302c007f61db8658a19bcae1043d2 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 3 Apr 2016 14:51:17 -0400 Subject: [PATCH 18/22] model configs come from dbt_project.yml file --- dbt/project.py | 7 +++++- dbt/task/compile.py | 56 ++++++++++++++++----------------------------- 2 files changed, 26 insertions(+), 37 deletions(-) diff --git a/dbt/project.py b/dbt/project.py index 5eae5617e4b..0e4e7b7da69 100644 --- a/dbt/project.py +++ b/dbt/project.py @@ -10,6 +10,12 @@ 'clean-targets': ['target'], 'outputs': {'default': {}}, 'run-target': 'default', + 'models': { + 'base': { + "materialized": False, + "enabled": True + } + } } default_profiles = { @@ -18,7 +24,6 @@ default_active_profiles = ['user'] - class Project: def __init__(self, cfg, profiles, active_profile_names=[]): diff --git a/dbt/task/compile.py b/dbt/task/compile.py index ea1c0831cc0..687af6becaf 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -5,31 +5,11 @@ import yaml from collections import defaultdict -default_model_config = { - "materialized": False, - "enabled": True, -} - class CompileTask: def __init__(self, args, project): self.args = args self.project = project - self.model_configs = {} - - def __load_model_config(self, config_path): - # directory containing the config file - model_path = os.path.dirname(config_path) - - if model_path not in self.model_configs and os.path.exists(config_path): - with open(config_path, 'r') as config_fh: - model_config = yaml.safe_load(config_fh) - - config = default_model_config.copy() - config.update(model_config) - - self.model_configs[model_path] = config - def __src_index(self): """returns: {'model': ['pardot/model.sql', 'segment/model.sql']} """ @@ -44,9 +24,6 @@ def __src_index(self): if fnmatch.fnmatch(filename, "*.sql"): indexed_files[source_path].append(rel_path) - elif filename == 'config.yml': - self.__load_model_config(abs_path) - return indexed_files def __write(self, path, payload): @@ -81,29 +58,36 @@ def __wrap_in_create(self, path, query, model_config): return create_template.format(**opts) + def __get_model_identifiers(self, model_filepath): + model_group = os.path.dirname(model_filepath) + model_name, _ = os.path.splitext(os.path.basename(model_filepath)) + return model_group, model_name - def __get_sql_file_config(self, src_path, f): - model_path = os.path.join(src_path, os.path.dirname(f)) - config = self.model_configs.get(model_path, default_model_config) - identifier, ext = os.path.splitext(os.path.basename(f)) - model_config = config.copy() + def __get_model_config(self, model_group, model_name): + """merges model, model group, and base configs together. Model config + takes precedence, then model_group, then base config""" - if identifier in model_config: - model_config.update(config[identifier]) + model_configs = self.project['models'] - return model_config + config = model_configs['base'].copy() + model_group_config = model_configs.get(model_group, {}) + model_config = model_group_config.get(model_name, {}) + + config.update(model_group_config) + config.update(model_config) + + return config def __compile(self, src_index): for src_path, files in src_index.items(): jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=src_path)) for f in files: - template = jinja.get_template(f) - rendered = template.render(self.project.context()) - model_config = self.__get_sql_file_config(src_path, f) + model_group, model_name = self.__get_model_identifiers(f) + model_config = self.__get_model_config(model_group, model_name) - if not model_config['enabled']: - continue + template = jinja.get_template(f) + rendered = template.render(self.project.context()) create_stmt = self.__wrap_in_create(f, rendered, model_config) From f3a7b706052caf1853ab0db5897ce444b59a7730 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 3 Apr 2016 14:55:52 -0400 Subject: [PATCH 19/22] skip disabled models in compile step --- dbt/task/compile.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbt/task/compile.py b/dbt/task/compile.py index 687af6becaf..ebc81f3340f 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -86,6 +86,9 @@ def __compile(self, src_index): model_group, model_name = self.__get_model_identifiers(f) model_config = self.__get_model_config(model_group, model_name) + if not model_config.get('enabled'): + continue + template = jinja.get_template(f) rendered = template.render(self.project.context()) From 0a6635784f577ba1527e10b33e7913775f1301c4 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 3 Apr 2016 15:03:34 -0400 Subject: [PATCH 20/22] add model config docs to sample.dbt_project.yml --- sample.dbt_project.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sample.dbt_project.yml b/sample.dbt_project.yml index 78219653860..56e306c8742 100644 --- a/sample.dbt_project.yml +++ b/sample.dbt_project.yml @@ -6,6 +6,17 @@ source-paths: ["model"] # paths with source code to compile target-path: "target" # path for compiled code clean-targets: ["target"] # directories removed by the clean task +models: + base: + enabled: true # enable all models by default + materialized: false # If true, create tables. If false, create views + + pardot: + enabled: false # disable all pardot models except where overriden + pardot_visitoractivity: # override configs for a particular model + enabled: true # enable this model + materialized: true # create a table instead of a view (overriding the base config) + # Run configuration # output environments outputs: From 4bb43a42f96d1650b40292e1c64d4d17dab152e0 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 3 Apr 2016 17:16:15 -0400 Subject: [PATCH 21/22] don't use 'base' as a magic keyword --- dbt/project.py | 9 ++++----- dbt/task/compile.py | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/dbt/project.py b/dbt/project.py index 0e4e7b7da69..bd6f561645f 100644 --- a/dbt/project.py +++ b/dbt/project.py @@ -10,11 +10,10 @@ 'clean-targets': ['target'], 'outputs': {'default': {}}, 'run-target': 'default', - 'models': { - 'base': { - "materialized": False, - "enabled": True - } + 'models': {}, + 'model-defaults': { + "enabled": True, + "materialized": False } } diff --git a/dbt/task/compile.py b/dbt/task/compile.py index ebc81f3340f..1032a3e314e 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -67,9 +67,9 @@ def __get_model_config(self, model_group, model_name): """merges model, model group, and base configs together. Model config takes precedence, then model_group, then base config""" - model_configs = self.project['models'] + config = self.project['model-defaults'].copy() - config = model_configs['base'].copy() + model_configs = self.project['models'] model_group_config = model_configs.get(model_group, {}) model_config = model_group_config.get(model_name, {}) From d7a2ba456e5c90225fece87a5230c9df532f06d7 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 3 Apr 2016 17:18:20 -0400 Subject: [PATCH 22/22] tweak example config file --- sample.dbt_project.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sample.dbt_project.yml b/sample.dbt_project.yml index 56e306c8742..eed170ec656 100644 --- a/sample.dbt_project.yml +++ b/sample.dbt_project.yml @@ -6,11 +6,11 @@ source-paths: ["model"] # paths with source code to compile target-path: "target" # path for compiled code clean-targets: ["target"] # directories removed by the clean task -models: - base: +model-defaults: enabled: true # enable all models by default materialized: false # If true, create tables. If false, create views +models: pardot: enabled: false # disable all pardot models except where overriden pardot_visitoractivity: # override configs for a particular model