From 3061d597ab95f97ec4c503d16034a24f3e92bd90 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 10 Oct 2016 20:53:57 -0400 Subject: [PATCH 1/8] wip --- dbt/main.py | 4 +++ dbt/schema.py | 13 +++++++++ dbt/targets.py | 15 ++++++++++ dbt/task/archive.py | 71 +++++++++++++++++++++++++++++++++++++++++++++ dbt/templates.py | 68 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 171 insertions(+) create mode 100644 dbt/task/archive.py diff --git a/dbt/main.py b/dbt/main.py index c374a29e6d5..877b4b6c184 100644 --- a/dbt/main.py +++ b/dbt/main.py @@ -16,6 +16,7 @@ import dbt.task.init as init_task import dbt.task.seed as seed_task import dbt.task.test as test_task +import dbt.task.archive as archive_task import dbt.tracking @@ -71,6 +72,9 @@ def handle(args): sub = subs.add_parser('deps', parents=[base_subparser]) sub.set_defaults(cls=deps_task.DepsTask, which='deps') + sub = subs.add_parser('archive', parents=[base_subparser]) + sub.set_defaults(cls=archive_task.ArchiveTask, which='archive') + sub = subs.add_parser('run', parents=[base_subparser]) sub.add_argument('--dry', action='store_true', help="'dry run' models") sub.add_argument('--models', required=False, nargs='+', help="Specify the models to run. All models depending on these models will also be run") diff --git a/dbt/schema.py b/dbt/schema.py index ba18e30bcae..53086d8dd60 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -104,6 +104,13 @@ def drop(self, schema, relation_type, relation): self.execute_and_handle_permissions(sql, relation) self.logger.info("dropped %s %s.%s", relation_type, schema, relation) + def get_columns_in_table(self, schema_name, table_name): + sql = self.target.sql_columns_in_table(schema_name, table_name) + self.logger.debug("getting columns in table %s.%s", schema_name, table_name) + results = self.execute_and_fetch(sql) + columns = {column: data_type for (column, data_type) in results} + self.logger.debug("Found columns: %s", columns) + return columns def rename(self, schema, from_name, to_name): rename_query = 'alter table "{schema}"."{from_name}" rename to "{to_name}"'.format(schema=schema, from_name=from_name, to_name=to_name) @@ -111,6 +118,12 @@ def rename(self, schema, from_name, to_name): self.execute_and_handle_permissions(rename_query, from_name) self.logger.info("renamed model %s.%s --> %s.%s", schema, from_name, schema, to_name) + def create_table(self, schema, table, columns_dict, sort, dist): + fields = ['"{field}" {data_type}'.format(field=field, data_type=data_type) for (field, data_type) in columns_dict.items()] + fields_csv = ",\n ".join(fields) + sql = 'create table if not exists "{schema}"."{table}" (\n {fields}\n);'.format(schema=schema, table=table, fields=fields_csv) + self.logger.info('creating table "%s"."%s"'.format(schema, table)) + self.execute_and_handle_permissions(sql, table) def create_schema_if_not_exists(self, schema_name): schemas = self.get_schemas() diff --git a/dbt/targets.py b/dbt/targets.py index f2522dc8c14..7703ec13bc3 100644 --- a/dbt/targets.py +++ b/dbt/targets.py @@ -114,6 +114,14 @@ class RedshiftTarget(BaseSQLTarget): def __init__(self, cfg): super(RedshiftTarget, self).__init__(cfg) + + def sql_columns_in_table(self, schema_name, table_name): + return """ + select "column" as column_name, "type" as "data_type" + from pg_table_def + where schemaname = '{schema_name}' and tablename = '{table_name}' + """.format(schema_name=schema_name, table_name=table_name).strip() + @property def context(self): return { @@ -124,6 +132,13 @@ class PostgresTarget(BaseSQLTarget): def __init__(self, cfg): super(PostgresTarget, self).__init__(cfg) + def sql_columns_in_table(self, schema_name, table_name): + return """ + select column_name, data_type + from information_schema.columns + where table_schema = '{schema_name}' and table_name = '{table_name}' + """.format(schema_name=schema_name, table_name=table_name).strip() + @property def context(self): return { diff --git a/dbt/task/archive.py b/dbt/task/archive.py new file mode 100644 index 00000000000..26b69ee8442 --- /dev/null +++ b/dbt/task/archive.py @@ -0,0 +1,71 @@ + + +from __future__ import print_function +import dbt.targets +import dbt.schema +import dbt.templates +import jinja2 + +class ArchivableTable(object): + def __init__(self, source_table, dest_table, unique_key, updated_at): + self.source_table = source_table + self.dest_table = dest_table + self.unique_key = unique_key + self.updated_at = updated_at + + def __repr__(self): + return " {} unique:{} updated_at:{}>".format(self.source_table, self.dest_table, self.unique_key, self.updated_at) + +class SourceSchema(object): + def __init__(self, source_schema, target_schema, tables): + self.source_schema = source_schema + self.target_schema = target_schema + self.tables = [self.parse_table(t) for t in tables] + + def parse_table(self, table_definition): + return ArchivableTable(**table_definition) + +class ArchiveTask: + def __init__(self, args, project): + self.args = args + self.project = project + + self.target = dbt.targets.get_target(self.project.run_environment()) + self.schema = dbt.schema.Schema(self.project, self.target) + + def run(self): + if 'archive' not in self.project: + raise RuntimeError("dbt_project.yml file is missing an 'archive' config!") + + # TODO : obviously handle input / validate better here + raw_source_schemas = self.project['archive'] + source_schemas = [SourceSchema(**item) for item in raw_source_schemas] + + for source_schema in source_schemas: + + # create archive schema if not exists! + self.schema.create_schema(source_schema.target_schema) + + for table in source_schema.tables: + columns = self.schema.get_columns_in_table(source_schema.source_schema, table.source_table) + + if len(columns) == 0: + raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema.source_schema, table.source_table)) + + # create archive table if not exists! TODO: Sort & Dist keys! Hmmmm + self.schema.create_table(source_schema.target_schema, table.dest_table, columns, sort=table.updated_at, dist=table.unique_key) + + env = jinja2.Environment() + + ctx = { + "columns": columns, + "table" : table, + "archive": source_schema + } + + base_query = dbt.templates.SCDArchiveTemplate + template = env.from_string(base_query, globals=ctx) + rendered = template.render(ctx) + + status = self.schema.execute(rendered) + print("STATUS: ", status) diff --git a/dbt/templates.py b/dbt/templates.py index b43251406e5..d487171a95c 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -165,3 +165,71 @@ def wrap(self, opts): raise RuntimeError("Invalid materialization parameter ({})".format(opts['materialization'])) return "{}\n\n{}".format(opts['prologue'], sql) + + + +SCDArchiveTemplate = """ + with current_data as ( + + select *, + {{ table.updated_at }} as dbt_updated_at, + {{ table.unique_key }} as dbt_pk + from "{{ archive.source_schema }}"."{{ table.source_table }}" + + ), + + archived_data as ( + + select *, + {{ table.updated_at }} as dbt_updated_at, + {{ table.unique_key }} as dbt_pk + from "{{ archive.target_schema }}"."{{ table.dest_table }}" + + ), + + combined as ( + + select + {% for (col, type) in columns.items() %} + "{{ col }}", + {% endfor %} + dbt_updated_at, + dbt_pk + from current_data + + union all + + select + {% for (col, type) in columns.items() %} + "{{ col }}", + {% endfor %} + dbt_updated_at, + dbt_pk + from archived_data + + ), + + merged as ( + + select + distinct + combined.*, + least(combined.dbt_updated_at, current_data.dbt_updated_at) as valid_from, + case when combined.dbt_updated_at = current_data.dbt_updated_at then null + else current_data.dbt_updated_at + end as valid_to + from current_data + left outer join combined + on combined.dbt_pk = current_data.dbt_pk + and current_data.dbt_updated_at >= combined.dbt_updated_at + + ), + with_id as ( + select *, + row_number() over (partition by dbt_pk order by dbt_updated_at asc) as dbt_archive_id + from merged + ) + + select md5(dbt_pk || '|' || dbt_archive_id) as scd_id, * + from with_id +""" From 00fa9f0cd8717b801e9789838ac8efdd4ed1656d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 10 Oct 2016 23:43:28 -0400 Subject: [PATCH 2/8] working archival --- dbt/schema.py | 7 ++++--- dbt/task/archive.py | 18 +++++++++++++--- dbt/templates.py | 50 ++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/dbt/schema.py b/dbt/schema.py index 53086d8dd60..93ff42890ca 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -108,7 +108,7 @@ def get_columns_in_table(self, schema_name, table_name): sql = self.target.sql_columns_in_table(schema_name, table_name) self.logger.debug("getting columns in table %s.%s", schema_name, table_name) results = self.execute_and_fetch(sql) - columns = {column: data_type for (column, data_type) in results} + columns = [(column, data_type) for (column, data_type) in results] self.logger.debug("Found columns: %s", columns) return columns @@ -118,9 +118,10 @@ def rename(self, schema, from_name, to_name): self.execute_and_handle_permissions(rename_query, from_name) self.logger.info("renamed model %s.%s --> %s.%s", schema, from_name, schema, to_name) - def create_table(self, schema, table, columns_dict, sort, dist): - fields = ['"{field}" {data_type}'.format(field=field, data_type=data_type) for (field, data_type) in columns_dict.items()] + def create_table(self, schema, table, columns, sort, dist): + fields = ['"{field}" {data_type}'.format(field=field, data_type=data_type) for (field, data_type) in columns] fields_csv = ",\n ".join(fields) + # TODO : Sort and Dist keys?? sql = 'create table if not exists "{schema}"."{table}" (\n {fields}\n);'.format(schema=schema, table=table, fields=fields_csv) self.logger.info('creating table "%s"."%s"'.format(schema, table)) self.execute_and_handle_permissions(sql, table) diff --git a/dbt/task/archive.py b/dbt/task/archive.py index 26b69ee8442..0bf84041088 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -53,7 +53,16 @@ def run(self): raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema.source_schema, table.source_table)) # create archive table if not exists! TODO: Sort & Dist keys! Hmmmm - self.schema.create_table(source_schema.target_schema, table.dest_table, columns, sort=table.updated_at, dist=table.unique_key) + + extra_cols = [ + ("valid_from", "timestamp"), + ("valid_to", "timestamp"), + ("scd_id","text"), + ("dbt_updated_at","timestamp") + ] + + dest_columns = columns + extra_cols + self.schema.create_table(source_schema.target_schema, table.dest_table, dest_columns, sort=table.updated_at, dist=table.unique_key) env = jinja2.Environment() @@ -67,5 +76,8 @@ def run(self): template = env.from_string(base_query, globals=ctx) rendered = template.render(ctx) - status = self.schema.execute(rendered) - print("STATUS: ", status) + template = dbt.templates.ArchiveInsertTemplate() + transaction = template.wrap(source_schema.target_schema, table.dest_table, rendered, table.unique_key) + + print(transaction) + #self.schema.execute_and_handle_permissions(transaction, table.dest_table) diff --git a/dbt/templates.py b/dbt/templates.py index d487171a95c..8d8798b374b 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -180,7 +180,10 @@ def wrap(self, opts): archived_data as ( - select *, + select + {% for (col, type) in columns %} + "{{ col }}", + {% endfor %} {{ table.updated_at }} as dbt_updated_at, {{ table.unique_key }} as dbt_pk from "{{ archive.target_schema }}"."{{ table.dest_table }}" @@ -190,7 +193,7 @@ def wrap(self, opts): combined as ( select - {% for (col, type) in columns.items() %} + {% for (col, type) in columns %} "{{ col }}", {% endfor %} dbt_updated_at, @@ -200,7 +203,7 @@ def wrap(self, opts): union all select - {% for (col, type) in columns.items() %} + {% for (col, type) in columns %} "{{ col }}", {% endfor %} dbt_updated_at, @@ -226,10 +229,47 @@ def wrap(self, opts): ), with_id as ( select *, - row_number() over (partition by dbt_pk order by dbt_updated_at asc) as dbt_archive_id + row_number() over (partition by dbt_pk order by dbt_updated_at asc) as dbt_archive_id, + count(*) over (partition by dbt_pk) as num_changes from merged ) - select md5(dbt_pk || '|' || dbt_archive_id) as scd_id, * + -- + -- TODO : The order of scd_id and dbt_updated_at depends + -- on the order of col injections in archive.py + select + {% for (col, type) in columns %} + "{{ col }}", + {% endfor %} + valid_from, + valid_to, + md5(dbt_pk || '|' || dbt_archive_id) as scd_id, + dbt_updated_at + from with_id + where num_changes > 1 """ + + +class ArchiveInsertTemplate(object): + archival_template = """ +create temporary table "{identifier}__dbt_archival_tmp" as ( + with dbt_archive_sbq as ( + {query} + ) + select * from dbt_archive_sbq +); + +delete from "{schema}"."{identifier}" where ({unique_key}) in ( + select ({unique_key}) from "{identifier}__dbt_archival_tmp" +); + +insert into "{schema}"."{identifier}" ( + select * from "{identifier}__dbt_archival_tmp" +); +""" + + def wrap(self, schema, table, query, unique_key): + sql = self.archival_template.format(schema=schema, identifier=table, query=query, unique_key=unique_key) + return sql + From 7f0855101e305aacc79d12093aebdeda57d636d2 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 11 Oct 2016 12:10:18 -0400 Subject: [PATCH 3/8] call sql exec func --- dbt/task/archive.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbt/task/archive.py b/dbt/task/archive.py index 0bf84041088..dc505668beb 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -79,5 +79,4 @@ def run(self): template = dbt.templates.ArchiveInsertTemplate() transaction = template.wrap(source_schema.target_schema, table.dest_table, rendered, table.unique_key) - print(transaction) - #self.schema.execute_and_handle_permissions(transaction, table.dest_table) + self.schema.execute_and_handle_permissions(transaction, table.dest_table) From 7bff5c6e3e2045fdba9f6d5e234669b06e971c7d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 11 Oct 2016 14:50:38 -0400 Subject: [PATCH 4/8] compile archive sql --- dbt/archival.py | 62 ++++++++++++++++++++++++++++++++++++ dbt/compilation.py | 27 +++++++++++++--- dbt/compiled_model.py | 18 +++++++++++ dbt/model.py | 33 +++++++++++++++++++ dbt/source.py | 23 +++++++++++++- dbt/task/archive.py | 74 +------------------------------------------ dbt/task/compile.py | 4 +-- dbt/templates.py | 15 +++++---- 8 files changed, 170 insertions(+), 86 deletions(-) create mode 100644 dbt/archival.py diff --git a/dbt/archival.py b/dbt/archival.py new file mode 100644 index 00000000000..7a4e31db8e0 --- /dev/null +++ b/dbt/archival.py @@ -0,0 +1,62 @@ + +from __future__ import print_function +import dbt.targets +import dbt.schema +import dbt.templates +import jinja2 + +class Archival(object): + + def __init__(self, project, archive_model): + self.archive_model = archive_model + self.project = project + + self.target = dbt.targets.get_target(self.project.run_environment()) + self.schema = dbt.schema.Schema(self.project, self.target) + + def compile(self): + source_schema = self.archive_model.source_schema + target_schema = self.archive_model.target_schema + source_table = self.archive_model.source_table + dest_table = self.archive_model.dest_table + unique_key = self.archive_model.unique_key + updated_at = self.archive_model.updated_at + + self.schema.create_schema(target_schema) + + source_columns = self.schema.get_columns_in_table(source_schema, source_table) + + if len(source_columns) == 0: + raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema, source_table)) + + # create archive table if not exists! TODO: Sort & Dist keys! Hmmmm + + extra_cols = [ + ("valid_from", "timestamp"), + ("valid_to", "timestamp"), + ("scd_id","text"), + ("dbt_updated_at","timestamp") + ] + + dest_columns = source_columns + extra_cols + self.schema.create_table(target_schema, dest_table, dest_columns, sort=updated_at, dist=unique_key) + + env = jinja2.Environment() + + ctx = { + "columns" : source_columns, + "updated_at" : updated_at, + "unique_key" : unique_key, + "source_schema" : source_schema, + "source_table" : source_table, + "target_schema" : target_schema, + "dest_table" : dest_table, + } + + base_query = dbt.templates.SCDArchiveTemplate + template = env.from_string(base_query, globals=ctx) + rendered = template.render(ctx) + + return rendered + + diff --git a/dbt/compilation.py b/dbt/compilation.py index 01bd0dd3c6e..91502710208 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -8,6 +8,7 @@ from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects, split_path, This, Var, compiler_error from dbt.linker import Linker import dbt.targets +import dbt.templates import time import sqlparse @@ -47,6 +48,10 @@ def get_macros(self, this_project, own_project=None): paths = own_project.get('macro-paths', []) return Source(this_project, own_project=own_project).get_macros(paths) + def get_archives(self, project): + archive_template = dbt.templates.ArchiveInsertTemplate() + return Source(project, own_project=project).get_archives(archive_template) + def project_schemas(self): source_paths = self.project.get('source-paths', []) return Source(self.project).get_schemas(source_paths) @@ -192,8 +197,8 @@ def compile_model(self, linker, model, models): return rendered - def write_graph_file(self, linker): - filename = 'graph-{}.yml'.format(self.create_template.label) + def write_graph_file(self, linker, label): + filename = 'graph-{}.yml'.format(label) graph_path = os.path.join(self.project['target-path'], filename) linker.write_graph(graph_path) @@ -329,7 +334,20 @@ def do_gen(ctx): return macros return do_gen + def compile_archives(self): + linker = Linker() + all_archives = self.get_archives(self.project) + + for archive in all_archives: + sql = archive.compile() + self.__write(archive.build_path(), sql) + + self.write_graph_file(linker, 'archive') + return all_archives + def compile(self, dry=False): + compiled_archives = [] if dry else self.compile_archives() + linker = Linker() all_models = self.model_sources(this_project=self.project) @@ -345,16 +363,17 @@ def compile(self, dry=False): compiled_models, written_models = self.compile_models(linker, enabled_models) + # TODO : only compile schema tests for enabled models written_schema_tests = self.compile_schema_tests(linker) self.validate_models_unique(compiled_models) self.validate_models_unique(written_schema_tests) - self.write_graph_file(linker) + self.write_graph_file(linker, self.create_template.label) if self.create_template.label != 'test': written_analyses = self.compile_analyses(linker, compiled_models) else: written_analyses = [] - return len(written_models), len(written_schema_tests), len(written_analyses) + return len(written_models), len(written_schema_tests), len(compiled_archives), len(written_analyses) diff --git a/dbt/compiled_model.py b/dbt/compiled_model.py index b824db4a30c..bacd39f2bd6 100644 --- a/dbt/compiled_model.py +++ b/dbt/compiled_model.py @@ -110,6 +110,22 @@ def prepare(self, existing, target): def __repr__(self): return "".format(self.data['project_name'], self.name, self.data['build_path']) +class CompiledArchive(CompiledModel): + def __init__(self, fqn, data): + super(CompiledArchive, self).__init__(fqn, data) + + def should_rename(self): + return False + + def should_execute(self): + return True + + def prepare(self, existing, target): + self.target = target + + def __repr__(self): + return "".format(self.data['project_name'], self.name, self.data['build_path']) + def make_compiled_model(fqn, data): run_type = data['dbt_run_type'] @@ -117,6 +133,8 @@ def make_compiled_model(fqn, data): return CompiledModel(fqn, data) elif run_type == 'test': return CompiledTest(fqn, data) + elif run_type == 'archive': + return CompiledArchive(fqn, data) else: raise RuntimeError("invalid run_type given: {}".format(run_type)) diff --git a/dbt/model.py b/dbt/model.py index 756e241d25b..06f18b14d94 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -7,6 +7,7 @@ from dbt.utils import split_path import dbt.schema_tester import dbt.project +import dbt.archival from dbt.utils import This, deep_merge, DBTConfigKeys, compiler_error class SourceConfig(object): @@ -570,3 +571,35 @@ def __repr__(self): return "".format(self.project['name'], self.name, self.filepath) +class ArchiveModel(DBTSource): + def __init__(self, project, create_template, source_schema, target_schema, source_table, dest_table, unique_key, updated_at): + + self.create_template = create_template + + self.source_schema = source_schema + self.target_schema = target_schema + self.source_table = source_table + self.dest_table = dest_table + self.unique_key = unique_key + self.updated_at = updated_at + + target_dir = self.create_template.label + rel_filepath = os.path.join(self.target_schema, self.dest_table) + + super(ArchiveModel, self).__init__(project, target_dir, rel_filepath, project) + + def compile(self): + archival = dbt.archival.Archival(self.project, self) + query = archival.compile() + + sql = self.create_template.wrap(self.target_schema, self.dest_table, query, self.unique_key) + return sql + + def build_path(self): + build_dir = self.create_template.label + filename = "{}.sql".format(self.name) + path_parts = [build_dir] + self.fqn[:-1] + [filename] + return os.path.join(*path_parts) + + def __repr__(self): + return " {} unique:{} updated_at:{}>".format(self.source_table, self.dest_table, self.unique_key, self.updated_at) diff --git a/dbt/source.py b/dbt/source.py index f6d6c365abd..1e1021909fa 100644 --- a/dbt/source.py +++ b/dbt/source.py @@ -1,7 +1,7 @@ import os.path import fnmatch -from dbt.model import Model, Analysis, TestModel, SchemaFile, Csv, Macro +from dbt.model import Model, Analysis, TestModel, SchemaFile, Csv, Macro, ArchiveModel class Source(object): def __init__(self, project, own_project=None): @@ -60,3 +60,24 @@ def get_macros(self, macro_dirs): macros = [Macro(*macro) for macro in self.find(macro_dirs, pattern)] return macros + def get_archives(self, create_template): + "Get Archive models defined in project config" + + if 'archive' not in self.project: + return [] + + raw_source_schemas = self.project['archive'].copy() + + archives = [] + for schema in raw_source_schemas: + if 'tables' not in schema: + continue + + tables = schema.pop('tables') + for table in tables: + fields = table.copy() + fields.update(schema) + archives.append(ArchiveModel(self.project, create_template, **fields)) + return archives + + diff --git a/dbt/task/archive.py b/dbt/task/archive.py index dc505668beb..0fa52782aed 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -1,82 +1,10 @@ -from __future__ import print_function -import dbt.targets -import dbt.schema -import dbt.templates -import jinja2 - -class ArchivableTable(object): - def __init__(self, source_table, dest_table, unique_key, updated_at): - self.source_table = source_table - self.dest_table = dest_table - self.unique_key = unique_key - self.updated_at = updated_at - - def __repr__(self): - return " {} unique:{} updated_at:{}>".format(self.source_table, self.dest_table, self.unique_key, self.updated_at) - -class SourceSchema(object): - def __init__(self, source_schema, target_schema, tables): - self.source_schema = source_schema - self.target_schema = target_schema - self.tables = [self.parse_table(t) for t in tables] - - def parse_table(self, table_definition): - return ArchivableTable(**table_definition) class ArchiveTask: def __init__(self, args, project): self.args = args self.project = project - self.target = dbt.targets.get_target(self.project.run_environment()) - self.schema = dbt.schema.Schema(self.project, self.target) - def run(self): - if 'archive' not in self.project: - raise RuntimeError("dbt_project.yml file is missing an 'archive' config!") - - # TODO : obviously handle input / validate better here - raw_source_schemas = self.project['archive'] - source_schemas = [SourceSchema(**item) for item in raw_source_schemas] - - for source_schema in source_schemas: - - # create archive schema if not exists! - self.schema.create_schema(source_schema.target_schema) - - for table in source_schema.tables: - columns = self.schema.get_columns_in_table(source_schema.source_schema, table.source_table) - - if len(columns) == 0: - raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema.source_schema, table.source_table)) - - # create archive table if not exists! TODO: Sort & Dist keys! Hmmmm - - extra_cols = [ - ("valid_from", "timestamp"), - ("valid_to", "timestamp"), - ("scd_id","text"), - ("dbt_updated_at","timestamp") - ] - - dest_columns = columns + extra_cols - self.schema.create_table(source_schema.target_schema, table.dest_table, dest_columns, sort=table.updated_at, dist=table.unique_key) - - env = jinja2.Environment() - - ctx = { - "columns": columns, - "table" : table, - "archive": source_schema - } - - base_query = dbt.templates.SCDArchiveTemplate - template = env.from_string(base_query, globals=ctx) - rendered = template.render(ctx) - - template = dbt.templates.ArchiveInsertTemplate() - transaction = template.wrap(source_schema.target_schema, table.dest_table, rendered, table.unique_key) - - self.schema.execute_and_handle_permissions(transaction, table.dest_table) + pass diff --git a/dbt/task/compile.py b/dbt/task/compile.py index a5dd07a37c6..c39a13e82b3 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -16,6 +16,6 @@ def run(self): compiler = Compiler(self.project, create_template) compiler.initialize() - created_models, created_tests, created_analyses = compiler.compile(dry=self.args.dry) + created_models, created_tests, created_archives, created_analyses = compiler.compile(dry=self.args.dry) - print("Compiled {} models, {} tests and {} analyses".format(created_models, created_tests, created_analyses)) + print("Compiled {} models, {} tests, {} archives and {} analyses".format(created_models, created_tests, created_archives, created_analyses)) diff --git a/dbt/templates.py b/dbt/templates.py index 8d8798b374b..aa04223a62c 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -172,9 +172,9 @@ def wrap(self, opts): with current_data as ( select *, - {{ table.updated_at }} as dbt_updated_at, - {{ table.unique_key }} as dbt_pk - from "{{ archive.source_schema }}"."{{ table.source_table }}" + {{ updated_at }} as dbt_updated_at, + {{ unique_key }} as dbt_pk + from "{{ source_schema }}"."{{ source_table }}" ), @@ -184,9 +184,9 @@ def wrap(self, opts): {% for (col, type) in columns %} "{{ col }}", {% endfor %} - {{ table.updated_at }} as dbt_updated_at, - {{ table.unique_key }} as dbt_pk - from "{{ archive.target_schema }}"."{{ table.dest_table }}" + {{ updated_at }} as dbt_updated_at, + {{ unique_key }} as dbt_pk + from "{{ target_schema }}"."{{ dest_table }}" ), @@ -252,6 +252,9 @@ def wrap(self, opts): class ArchiveInsertTemplate(object): + + label = "archive" + archival_template = """ create temporary table "{identifier}__dbt_archival_tmp" as ( with dbt_archive_sbq as ( From 8fb4b680a264197af0627161f13ed20b93d9b161 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 11 Oct 2016 21:03:12 -0400 Subject: [PATCH 5/8] raiders of the lost archive --- dbt/archival.py | 3 +- dbt/compilation.py | 12 ++++--- dbt/compiled_model.py | 4 +++ dbt/model.py | 17 ++++++++++ dbt/runner.py | 49 ++++++++++++++++++++++++++-- dbt/schema.py | 9 ++++++ dbt/task/archive.py | 16 ++++++++-- dbt/templates.py | 74 ++++++++++++++++++++++++++----------------- 8 files changed, 146 insertions(+), 38 deletions(-) diff --git a/dbt/archival.py b/dbt/archival.py index 7a4e31db8e0..7c2dff8fe17 100644 --- a/dbt/archival.py +++ b/dbt/archival.py @@ -5,6 +5,7 @@ import dbt.templates import jinja2 + class Archival(object): def __init__(self, project, archive_model): @@ -50,7 +51,7 @@ def compile(self): "source_schema" : source_schema, "source_table" : source_table, "target_schema" : target_schema, - "dest_table" : dest_table, + "dest_table" : dest_table } base_query = dbt.templates.SCDArchiveTemplate diff --git a/dbt/compilation.py b/dbt/compilation.py index 91502710208..116bb3dde78 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -39,6 +39,8 @@ def model_sources(self, this_project, own_project=None): return Source(this_project, own_project=own_project).get_models(paths, self.create_template) elif self.create_template.label == 'test': return Source(this_project, own_project=own_project).get_test_models(paths, self.create_template) + elif self.create_template.label == 'archive': + return [] else: raise RuntimeError("unexpected create template type: '{}'".format(self.create_template.label)) @@ -340,14 +342,14 @@ def compile_archives(self): for archive in all_archives: sql = archive.compile() + fqn = tuple(archive.fqn) + linker.update_node_data(fqn, archive.serialize()) self.__write(archive.build_path(), sql) self.write_graph_file(linker, 'archive') return all_archives def compile(self, dry=False): - compiled_archives = [] if dry else self.compile_archives() - linker = Linker() all_models = self.model_sources(this_project=self.project) @@ -363,7 +365,6 @@ def compile(self, dry=False): compiled_models, written_models = self.compile_models(linker, enabled_models) - # TODO : only compile schema tests for enabled models written_schema_tests = self.compile_schema_tests(linker) @@ -371,9 +372,12 @@ def compile(self, dry=False): self.validate_models_unique(written_schema_tests) self.write_graph_file(linker, self.create_template.label) - if self.create_template.label != 'test': + if self.create_template.label not in ['test', 'archive']: written_analyses = self.compile_analyses(linker, compiled_models) else: written_analyses = [] + + compiled_archives = self.compile_archives() + return len(written_models), len(written_schema_tests), len(compiled_archives), len(written_analyses) diff --git a/dbt/compiled_model.py b/dbt/compiled_model.py index bacd39f2bd6..f9bfe90e06d 100644 --- a/dbt/compiled_model.py +++ b/dbt/compiled_model.py @@ -6,6 +6,7 @@ class CompiledModel(object): def __init__(self, fqn, data): self.fqn = fqn self.data = data + self.nice_name = ".".join(fqn) # these are set just before the models are executed self.tmp_drop_type = None @@ -23,6 +24,9 @@ def hashed_name(self): fqn_string = ".".join(self.fqn) return hashlib.md5(fqn_string.encode('utf-8')).hexdigest() + def context(self): + return self.data + def hashed_contents(self): return hashlib.md5(self.contents.encode('utf-8')).hexdigest() diff --git a/dbt/model.py b/dbt/model.py index 06f18b14d94..74849250352 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -572,6 +572,8 @@ def __repr__(self): class ArchiveModel(DBTSource): + dbt_run_type = 'archive' + def __init__(self, project, create_template, source_schema, target_schema, source_table, dest_table, unique_key, updated_at): self.create_template = create_template @@ -588,6 +590,21 @@ def __init__(self, project, create_template, source_schema, target_schema, sourc super(ArchiveModel, self).__init__(project, target_dir, rel_filepath, project) + def serialize(self): + data = DBTSource.serialize(self).copy() + + serialized = { + "source_schema" : self.source_schema, + "target_schema" : self.target_schema, + "source_table" : self.source_table, + "dest_table" : self.dest_table, + "unique_key" : self.unique_key, + "updated_at" : self.updated_at + } + + data.update(serialized) + return data + def compile(self): archival = dbt.archival.Archival(self.project, self) query = archival.compile() diff --git a/dbt/runner.py b/dbt/runner.py index b644061d1dc..8fdd35a6608 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -198,6 +198,43 @@ def execute(self, schema, target, model): return row[0] +class ArchiveRunner(BaseRunner): + run_type = 'archive' + + def pre_run_msg(self, model): + print_vars = { + "schema": model.target.schema, + "model_name": model.name, + } + + output = "START archive table {schema}.{model_name} ".format(**print_vars) + return output + + def post_run_msg(self, result): + model = result.model + print_vars = { + "schema": model.target.schema, + "model_name": model.name, + "info": "ERROR archiving" if result.errored else "OK created" + } + + output = "{info} table {schema}.{model_name} ".format(**print_vars) + return output + + def pre_run_all_msg(self, models): + return "Archiving {} tables".format(len(models)) + + def post_run_all_msg(self, results): + return "Finished archiving {} tables".format(len(results)) + + def status(self, result): + return result.status + + def execute(self, schema, target, model): + print(model.compiled_contents) + status = schema.execute_and_handle_permissions(model.compiled_contents, model.name) + return status + class RunManager(object): def __init__(self, project, target_path, graph_type): self.logger = logging.getLogger(__name__) @@ -218,7 +255,9 @@ def __init__(self, project, target_path, graph_type): self.context = { "run_started_at": datetime.now(), - "invocation_id": dbt.tracking.invocation_id, + "invocation_id" : dbt.tracking.invocation_id, + "get_columns_in_table" : self.schema.get_columns_in_table, + "get_missing_columns" : self.schema.get_missing_columns, } @@ -378,7 +417,9 @@ def run_from_graph(self, runner, limit_to): for m in relevant_compiled_models: if m.should_execute(): - m.compile(self.context) + context = self.context.copy() + context.update(m.context()) + m.compile(context) schema_name = self.target.schema @@ -430,3 +471,7 @@ def dry_run(self, limit_to=None): runner = DryRunner() return self.safe_run_from_graph(runner, limit_to) + def run_archive(self): + runner = ArchiveRunner() + return self.safe_run_from_graph(runner, None) + diff --git a/dbt/schema.py b/dbt/schema.py index 93ff42890ca..c76f243ee5a 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -118,6 +118,15 @@ def rename(self, schema, from_name, to_name): self.execute_and_handle_permissions(rename_query, from_name) self.logger.info("renamed model %s.%s --> %s.%s", schema, from_name, schema, to_name) + def get_missing_columns(self, from_schema, from_table, to_schema, to_table): + "Returns dict of {column:type} for columns in from_table that are missing from to_table" + from_columns = {col:dtype for (col,dtype) in self.get_columns_in_table(from_schema, from_table)} + to_columns = {col:dtype for (col,dtype) in self.get_columns_in_table(to_schema, to_table)} + + missing_columns = set(from_columns.keys()) - set(to_columns.keys()) + + return {col:dtype for (col, dtype) in from_columns.items() if col in missing_columns} + def create_table(self, schema, table, columns, sort, dist): fields = ['"{field}" {data_type}'.format(field=field, data_type=data_type) for (field, data_type) in columns] fields_csv = ",\n ".join(fields) diff --git a/dbt/task/archive.py b/dbt/task/archive.py index 0fa52782aed..40467b42286 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -1,10 +1,22 @@ - +from dbt.runner import RunManager +from dbt.templates import ArchiveInsertTemplate +from dbt.compilation import Compiler class ArchiveTask: def __init__(self, args, project): self.args = args self.project = project + self.create_template = ArchiveInsertTemplate + + def compile(self): + compiler = Compiler(self.project, self.create_template) + compiler.initialize() + compiler.compile_archives() def run(self): - pass + self.compile() + runner = RunManager(self.project, self.project['target-path'], self.create_template.label) + + results = runner.run_archive() + diff --git a/dbt/templates.py b/dbt/templates.py index aa04223a62c..3527f242098 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -167,6 +167,9 @@ def wrap(self, opts): return "{}\n\n{}".format(opts['prologue'], sql) +SCDGetColumnsInTable = """ +""" + SCDArchiveTemplate = """ with current_data as ( @@ -181,9 +184,11 @@ def wrap(self, opts): archived_data as ( select - {% for (col, type) in columns %} - "{{ col }}", - {% endfor %} + {% raw %} + {% for (col, type) in get_columns_in_table(source_schema, source_table) %} + "{{ col }}" {% if not loop.last %},{% endif %} + {% endfor %}, + {% endraw %} {{ updated_at }} as dbt_updated_at, {{ unique_key }} as dbt_pk from "{{ target_schema }}"."{{ dest_table }}" @@ -193,22 +198,26 @@ def wrap(self, opts): combined as ( select - {% for (col, type) in columns %} - "{{ col }}", - {% endfor %} - dbt_updated_at, - dbt_pk - from current_data + {% raw %} + {% for (col, type) in get_columns_in_table(source_schema, source_table) %} + "{{ col }}" {% if not loop.last %},{% endif %} + {% endfor %}, + {% endraw %} + dbt_updated_at, + dbt_pk + from current_data union all select - {% for (col, type) in columns %} - "{{ col }}", - {% endfor %} - dbt_updated_at, - dbt_pk - from archived_data + {% raw %} + {% for (col, type) in get_columns_in_table(source_schema, source_table) %} + "{{ col }}" {% if not loop.last %},{% endif %} + {% endfor %}, + {% endraw %} + dbt_updated_at, + dbt_pk + from archived_data ), @@ -234,17 +243,8 @@ def wrap(self, opts): from merged ) - -- - -- TODO : The order of scd_id and dbt_updated_at depends - -- on the order of col injections in archive.py - select - {% for (col, type) in columns %} - "{{ col }}", - {% endfor %} - valid_from, - valid_to, - md5(dbt_pk || '|' || dbt_archive_id) as scd_id, - dbt_updated_at + select *, + md5(dbt_pk || '|' || dbt_archive_id) as scd_id from with_id where num_changes > 1 @@ -255,7 +255,22 @@ class ArchiveInsertTemplate(object): label = "archive" + alter_template = """ +{% for (col, dtype) in get_missing_columns(source_schema, source_table, target_schema, dest_table).items() %} + alter table "{{ target_schema }}"."{{ dest_table }}" add column "{{ col }}" {{ dtype }}; +{% endfor %} +""" + + dest_cols = """ +{% for (col, type) in get_columns_in_table(target_schema, dest_table) %} + "{{ col }}" {% if not loop.last %},{% endif %} +{% endfor %} +""" + archival_template = """ + +{alter_template} + create temporary table "{identifier}__dbt_archival_tmp" as ( with dbt_archive_sbq as ( {query} @@ -268,11 +283,12 @@ class ArchiveInsertTemplate(object): ); insert into "{schema}"."{identifier}" ( - select * from "{identifier}__dbt_archival_tmp" -); + {dest_cols} +) +select {dest_cols} from "{identifier}__dbt_archival_tmp"; """ def wrap(self, schema, table, query, unique_key): - sql = self.archival_template.format(schema=schema, identifier=table, query=query, unique_key=unique_key) + sql = self.archival_template.format(schema=schema, identifier=table, query=query, unique_key=unique_key, alter_template=self.alter_template, dest_cols=self.dest_cols) return sql From 13546726b87ee374decad25410dbb7058bdd05a1 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 12 Oct 2016 17:09:15 -0400 Subject: [PATCH 6/8] fix archival query, code cleanup --- dbt/archival.py | 4 ++ dbt/compilation.py | 9 ++++- dbt/runner.py | 1 - dbt/schema.py | 27 ++++++++++++- dbt/task/archive.py | 3 +- dbt/task/compile.py | 7 ++-- dbt/task/run.py | 8 ++-- dbt/task/test.py | 7 ++-- dbt/templates.py | 99 +++++++++++++++++++++++---------------------- 9 files changed, 102 insertions(+), 63 deletions(-) diff --git a/dbt/archival.py b/dbt/archival.py index 7c2dff8fe17..40a38a51d70 100644 --- a/dbt/archival.py +++ b/dbt/archival.py @@ -60,4 +60,8 @@ def compile(self): return rendered + def runtime_compile(self, compiled_model): + context = self.context.copy() + context.update(model.context()) + model.compile(context) diff --git a/dbt/compilation.py b/dbt/compilation.py index 116bb3dde78..336cbf9dc39 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -12,6 +12,8 @@ import time import sqlparse +CompilableEntities = ["models", "tests", "archives", "analyses"] + class Compiler(object): def __init__(self, project, create_template_class): self.project = project @@ -380,4 +382,9 @@ def compile(self, dry=False): compiled_archives = self.compile_archives() - return len(written_models), len(written_schema_tests), len(compiled_archives), len(written_analyses) + return { + "models": len(written_models), + "tests" : len(written_schema_tests), + "archives": len(compiled_archives), + "analyses" : len(written_analyses) + } diff --git a/dbt/runner.py b/dbt/runner.py index 8fdd35a6608..f44e22e4bb8 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -231,7 +231,6 @@ def status(self, result): return result.status def execute(self, schema, target, model): - print(model.compiled_contents) status = schema.execute_and_handle_permissions(model.compiled_contents, model.name) return status diff --git a/dbt/schema.py b/dbt/schema.py index c76f243ee5a..1b962de8a01 100644 --- a/dbt/schema.py +++ b/dbt/schema.py @@ -23,6 +23,20 @@ def __init__(self, project, target): self.target = target self.logger = logging.getLogger(__name__) + self.schema_cache = {} + + def cache_table_columns(self, schema, table, columns): + tid = (schema, table) + + if tid not in self.schema_cache: + self.schema_cache[tid] = columns + + return tid + + def get_table_columns_if_cached(self, schema, table): + tid = (schema, table) + return self.schema_cache.get(tid, None) + def get_schemas(self): existing = [] results = self.execute_and_fetch('select nspname from pg_catalog.pg_namespace') @@ -105,10 +119,19 @@ def drop(self, schema, relation_type, relation): self.logger.info("dropped %s %s.%s", relation_type, schema, relation) def get_columns_in_table(self, schema_name, table_name): - sql = self.target.sql_columns_in_table(schema_name, table_name) self.logger.debug("getting columns in table %s.%s", schema_name, table_name) + + columns = self.get_table_columns_if_cached(schema_name, table_name) + if columns is not None: + self.logger.debug("Found columns (in cache): %s", columns) + return columns + + sql = self.target.sql_columns_in_table(schema_name, table_name) results = self.execute_and_fetch(sql) columns = [(column, data_type) for (column, data_type) in results] + + self.cache_table_columns(schema_name, table_name, columns) + self.logger.debug("Found columns: %s", columns) return columns @@ -125,7 +148,7 @@ def get_missing_columns(self, from_schema, from_table, to_schema, to_table): missing_columns = set(from_columns.keys()) - set(to_columns.keys()) - return {col:dtype for (col, dtype) in from_columns.items() if col in missing_columns} + return [(col, dtype) for (col, dtype) in from_columns.items() if col in missing_columns] def create_table(self, schema, table, columns, sort, dist): fields = ['"{field}" {data_type}'.format(field=field, data_type=data_type) for (field, data_type) in columns] diff --git a/dbt/task/archive.py b/dbt/task/archive.py index 40467b42286..067b8a091dd 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -12,7 +12,8 @@ def __init__(self, args, project): def compile(self): compiler = Compiler(self.project, self.create_template) compiler.initialize() - compiler.compile_archives() + compiled = compiler.compile_archives() + print("Compiled {} archives".format(len(compiled))) def run(self): self.compile() diff --git a/dbt/task/compile.py b/dbt/task/compile.py index c39a13e82b3..f06941c027d 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -1,5 +1,5 @@ -from dbt.compilation import Compiler +from dbt.compilation import Compiler, CompilableEntities from dbt.templates import BaseCreateTemplate, DryCreateTemplate @@ -16,6 +16,7 @@ def run(self): compiler = Compiler(self.project, create_template) compiler.initialize() - created_models, created_tests, created_archives, created_analyses = compiler.compile(dry=self.args.dry) + results = compiler.compile(dry=self.args.dry) - print("Compiled {} models, {} tests, {} archives and {} analyses".format(created_models, created_tests, created_archives, created_analyses)) + stat_line = ", ".join(["{} {}".format(results[k], k) for k in CompilableEntities]) + print("Compiled {}".format(stat_line)) diff --git a/dbt/task/run.py b/dbt/task/run.py index 8007a5dcc36..ef61d6cf9f5 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -4,7 +4,7 @@ import os from dbt.templates import DryCreateTemplate, BaseCreateTemplate from dbt.runner import RunManager -from dbt.compilation import Compiler +from dbt.compilation import Compiler, CompilableEntities THREAD_LIMIT = 9 @@ -17,8 +17,10 @@ def compile(self): create_template = DryCreateTemplate if self.args.dry else BaseCreateTemplate compiler = Compiler(self.project, create_template) compiler.initialize() - created_models, created_tests, created_analyses = compiler.compile(self.args.dry) - print("Compiled {} models, {} tests, and {} analyses".format(created_models, created_tests, created_analyses)) + results = compiler.compile(self.args.dry) + + stat_line = ", ".join(["{} {}".format(results[k], k) for k in CompilableEntities]) + print("Compiled {}".format(stat_line)) return create_template.label diff --git a/dbt/task/test.py b/dbt/task/test.py index c52250720b4..75204ded627 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -3,7 +3,7 @@ import psycopg2 import yaml -from dbt.compilation import Compiler +from dbt.compilation import Compiler, CompilableEntities from dbt.templates import DryCreateTemplate, BaseCreateTemplate from dbt.runner import RunManager from dbt.schema_tester import SchemaTester @@ -25,9 +25,10 @@ def __init__(self, args, project): def compile(self): compiler = Compiler(self.project, BaseCreateTemplate) compiler.initialize() + results = compiler.compile() - created_models, created_tests, created_analyses = compiler.compile() - print("Compiled {} models, {} tests, and {} analyses".format(created_models, created_tests, created_analyses)) + stat_line = ", ".join(["{} {}".format(results[k], k) for k in CompilableEntities]) + print("Compiled {}".format(stat_line)) return compiler diff --git a/dbt/templates.py b/dbt/templates.py index 3527f242098..75919384f8a 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -172,11 +172,14 @@ def wrap(self, opts): SCDArchiveTemplate = """ + with current_data as ( select *, {{ updated_at }} as dbt_updated_at, - {{ unique_key }} as dbt_pk + {{ unique_key }} as dbt_pk, + {{ updated_at }} as valid_from, + null::timestamp as tmp_valid_to from "{{ source_schema }}"."{{ source_table }}" ), @@ -190,64 +193,50 @@ def wrap(self, opts): {% endfor %}, {% endraw %} {{ updated_at }} as dbt_updated_at, - {{ unique_key }} as dbt_pk + {{ unique_key }} as dbt_pk, + valid_from, + valid_to as tmp_valid_to from "{{ target_schema }}"."{{ dest_table }}" ), - combined as ( + insertions as ( select - {% raw %} - {% for (col, type) in get_columns_in_table(source_schema, source_table) %} - "{{ col }}" {% if not loop.last %},{% endif %} - {% endfor %}, - {% endraw %} - dbt_updated_at, - dbt_pk - from current_data + current_data.*, + null::timestamp as valid_to + from current_data + left outer join archived_data on archived_data.dbt_pk = current_data.dbt_pk + where archived_data.dbt_pk is null or ( + archived_data.dbt_pk is not null and + current_data.dbt_updated_at > archived_data.dbt_updated_at and + archived_data.tmp_valid_to is null + ) + ), - union all + updates as ( select - {% raw %} - {% for (col, type) in get_columns_in_table(source_schema, source_table) %} - "{{ col }}" {% if not loop.last %},{% endif %} - {% endfor %}, - {% endraw %} - dbt_updated_at, - dbt_pk - from archived_data - + archived_data.*, + current_data.dbt_updated_at as valid_to + from current_data + left outer join archived_data on archived_data.dbt_pk = current_data.dbt_pk + where archived_data.dbt_pk is not null + and archived_data.dbt_updated_at < current_data.dbt_updated_at + and archived_data.tmp_valid_to is null ), merged as ( - select - distinct - combined.*, - least(combined.dbt_updated_at, current_data.dbt_updated_at) as valid_from, - case when combined.dbt_updated_at = current_data.dbt_updated_at then null - else current_data.dbt_updated_at - end as valid_to - from current_data - left outer join combined - on combined.dbt_pk = current_data.dbt_pk - and current_data.dbt_updated_at >= combined.dbt_updated_at + select *, 'update' as change_type from updates + union all + select *, 'insert' as change_type from insertions - ), - with_id as ( - select *, - row_number() over (partition by dbt_pk order by dbt_updated_at asc) as dbt_archive_id, - count(*) over (partition by dbt_pk) as num_changes - from merged ) select *, - md5(dbt_pk || '|' || dbt_archive_id) as scd_id - - from with_id - where num_changes > 1 + md5(dbt_pk || '|' || dbt_updated_at) as scd_id + from merged """ @@ -255,20 +244,30 @@ class ArchiveInsertTemplate(object): label = "archive" + + # missing_columns : columns in source_table that are missing from dest_table (used for the ALTER) + # dest_columns : columns in the dest table (post-alter!) + definitions = """ +{% set missing_columns = get_missing_columns(source_schema, source_table, target_schema, dest_table) %} +{% set dest_columns = get_columns_in_table(target_schema, dest_table) + missing_columns %} +""" + alter_template = """ -{% for (col, dtype) in get_missing_columns(source_schema, source_table, target_schema, dest_table).items() %} +{% for (col, dtype) in missing_columns %} alter table "{{ target_schema }}"."{{ dest_table }}" add column "{{ col }}" {{ dtype }}; {% endfor %} """ dest_cols = """ -{% for (col, type) in get_columns_in_table(target_schema, dest_table) %} +{% for (col, type) in dest_columns %} "{{ col }}" {% if not loop.last %},{% endif %} {% endfor %} """ archival_template = """ +{definitions} + {alter_template} create temporary table "{identifier}__dbt_archival_tmp" as ( @@ -278,17 +277,19 @@ class ArchiveInsertTemplate(object): select * from dbt_archive_sbq ); -delete from "{schema}"."{identifier}" where ({unique_key}) in ( - select ({unique_key}) from "{identifier}__dbt_archival_tmp" -); +update "{schema}"."{identifier}" as archive set valid_to = tmp.valid_to +from "{identifier}__dbt_archival_tmp" as tmp +where tmp.scd_id = archive.scd_id + and change_type = 'update'; insert into "{schema}"."{identifier}" ( {dest_cols} ) -select {dest_cols} from "{identifier}__dbt_archival_tmp"; +select {dest_cols} from "{identifier}__dbt_archival_tmp" +where change_type = 'insert'; """ def wrap(self, schema, table, query, unique_key): - sql = self.archival_template.format(schema=schema, identifier=table, query=query, unique_key=unique_key, alter_template=self.alter_template, dest_cols=self.dest_cols) + sql = self.archival_template.format(schema=schema, identifier=table, query=query, unique_key=unique_key, alter_template=self.alter_template, dest_cols=self.dest_cols, definitions=self.definitions) return sql From fbf7facc5eeccbabfa1698b70cec8d33e071b224 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 12 Oct 2016 17:14:59 -0400 Subject: [PATCH 7/8] archive arg consistency --- dbt/archival.py | 6 +++--- dbt/model.py | 12 ++++++------ dbt/templates.py | 10 +++++----- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/dbt/archival.py b/dbt/archival.py index 40a38a51d70..50638843b23 100644 --- a/dbt/archival.py +++ b/dbt/archival.py @@ -19,7 +19,7 @@ def compile(self): source_schema = self.archive_model.source_schema target_schema = self.archive_model.target_schema source_table = self.archive_model.source_table - dest_table = self.archive_model.dest_table + target_table = self.archive_model.target_table unique_key = self.archive_model.unique_key updated_at = self.archive_model.updated_at @@ -40,7 +40,7 @@ def compile(self): ] dest_columns = source_columns + extra_cols - self.schema.create_table(target_schema, dest_table, dest_columns, sort=updated_at, dist=unique_key) + self.schema.create_table(target_schema, target_table, dest_columns, sort=updated_at, dist=unique_key) env = jinja2.Environment() @@ -51,7 +51,7 @@ def compile(self): "source_schema" : source_schema, "source_table" : source_table, "target_schema" : target_schema, - "dest_table" : dest_table + "target_table" : target_table } base_query = dbt.templates.SCDArchiveTemplate diff --git a/dbt/model.py b/dbt/model.py index 74849250352..6cf972e3353 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -574,19 +574,19 @@ def __repr__(self): class ArchiveModel(DBTSource): dbt_run_type = 'archive' - def __init__(self, project, create_template, source_schema, target_schema, source_table, dest_table, unique_key, updated_at): + def __init__(self, project, create_template, source_schema, target_schema, source_table, target_table, unique_key, updated_at): self.create_template = create_template self.source_schema = source_schema self.target_schema = target_schema self.source_table = source_table - self.dest_table = dest_table + self.target_table = target_table self.unique_key = unique_key self.updated_at = updated_at target_dir = self.create_template.label - rel_filepath = os.path.join(self.target_schema, self.dest_table) + rel_filepath = os.path.join(self.target_schema, self.target_table) super(ArchiveModel, self).__init__(project, target_dir, rel_filepath, project) @@ -597,7 +597,7 @@ def serialize(self): "source_schema" : self.source_schema, "target_schema" : self.target_schema, "source_table" : self.source_table, - "dest_table" : self.dest_table, + "target_table" : self.target_table, "unique_key" : self.unique_key, "updated_at" : self.updated_at } @@ -609,7 +609,7 @@ def compile(self): archival = dbt.archival.Archival(self.project, self) query = archival.compile() - sql = self.create_template.wrap(self.target_schema, self.dest_table, query, self.unique_key) + sql = self.create_template.wrap(self.target_schema, self.target_table, query, self.unique_key) return sql def build_path(self): @@ -619,4 +619,4 @@ def build_path(self): return os.path.join(*path_parts) def __repr__(self): - return " {} unique:{} updated_at:{}>".format(self.source_table, self.dest_table, self.unique_key, self.updated_at) + return " {} unique:{} updated_at:{}>".format(self.source_table, self.target_table, self.unique_key, self.updated_at) diff --git a/dbt/templates.py b/dbt/templates.py index 75919384f8a..51bdfef5900 100644 --- a/dbt/templates.py +++ b/dbt/templates.py @@ -196,7 +196,7 @@ def wrap(self, opts): {{ unique_key }} as dbt_pk, valid_from, valid_to as tmp_valid_to - from "{{ target_schema }}"."{{ dest_table }}" + from "{{ target_schema }}"."{{ target_table }}" ), @@ -245,16 +245,16 @@ class ArchiveInsertTemplate(object): label = "archive" - # missing_columns : columns in source_table that are missing from dest_table (used for the ALTER) + # missing_columns : columns in source_table that are missing from target_table (used for the ALTER) # dest_columns : columns in the dest table (post-alter!) definitions = """ -{% set missing_columns = get_missing_columns(source_schema, source_table, target_schema, dest_table) %} -{% set dest_columns = get_columns_in_table(target_schema, dest_table) + missing_columns %} +{% set missing_columns = get_missing_columns(source_schema, source_table, target_schema, target_table) %} +{% set dest_columns = get_columns_in_table(target_schema, target_table) + missing_columns %} """ alter_template = """ {% for (col, dtype) in missing_columns %} - alter table "{{ target_schema }}"."{{ dest_table }}" add column "{{ col }}" {{ dtype }}; + alter table "{{ target_schema }}"."{{ target_table }}" add column "{{ col }}" {{ dtype }}; {% endfor %} """ From 35ff58490699ef4ccfb1c8d98644b83d9f05960c Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 12 Oct 2016 17:22:51 -0400 Subject: [PATCH 8/8] harden input validation for archival --- dbt/model.py | 30 +++++++++++++++++++++++------- dbt/runner.py | 2 +- dbt/source.py | 2 +- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/dbt/model.py b/dbt/model.py index 6cf972e3353..6ae011f2dcb 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -574,22 +574,38 @@ def __repr__(self): class ArchiveModel(DBTSource): dbt_run_type = 'archive' - def __init__(self, project, create_template, source_schema, target_schema, source_table, target_table, unique_key, updated_at): + def __init__(self, project, create_template, archive_data): self.create_template = create_template - self.source_schema = source_schema - self.target_schema = target_schema - self.source_table = source_table - self.target_table = target_table - self.unique_key = unique_key - self.updated_at = updated_at + self.validate(archive_data) + + self.source_schema = archive_data['source_schema'] + self.target_schema = archive_data['target_schema'] + self.source_table = archive_data['source_table'] + self.target_table = archive_data['target_table'] + self.unique_key = archive_data['unique_key'] + self.updated_at = archive_data['updated_at'] target_dir = self.create_template.label rel_filepath = os.path.join(self.target_schema, self.target_table) super(ArchiveModel, self).__init__(project, target_dir, rel_filepath, project) + def validate(self, data): + required = [ + 'source_schema', + 'target_schema', + 'source_table', + 'target_table', + 'unique_key', + 'updated_at', + ] + + for key in required: + if data.get(key, None) is None: + raise RuntimeError("Invalid archive config: missing required field '{}'".format(key)) + def serialize(self): data = DBTSource.serialize(self).copy() diff --git a/dbt/runner.py b/dbt/runner.py index f44e22e4bb8..ebbcd018f86 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -339,7 +339,7 @@ def execute_models(self, runner, model_dependency_list, on_failure): num_models = len(flat_models) if num_models == 0: - print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path)) + print("WARNING: Nothing to do. Try checking your model configs and running `dbt compile`".format(self.target_path)) return [] num_threads = self.target.threads diff --git a/dbt/source.py b/dbt/source.py index 1e1021909fa..1d98d9239c9 100644 --- a/dbt/source.py +++ b/dbt/source.py @@ -77,7 +77,7 @@ def get_archives(self, create_template): for table in tables: fields = table.copy() fields.update(schema) - archives.append(ArchiveModel(self.project, create_template, **fields)) + archives.append(ArchiveModel(self.project, create_template, fields)) return archives