Skip to content

Commit

Permalink
Merge pull request #183 from analyst-collective/feature/archive
Browse files Browse the repository at this point in the history
Feature/archive
  • Loading branch information
drewbanin authored Oct 20, 2016
2 parents d298d89 + e91c181 commit 7411407
Show file tree
Hide file tree
Showing 14 changed files with 488 additions and 18 deletions.
67 changes: 67 additions & 0 deletions dbt/archival.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

from __future__ import print_function
import dbt.targets
import dbt.schema
import dbt.templates
import jinja2


class Archival(object):

def __init__(self, project, archive_model):
self.archive_model = archive_model
self.project = project

self.target = dbt.targets.get_target(self.project.run_environment())
self.schema = dbt.schema.Schema(self.project, self.target)

def compile(self):
source_schema = self.archive_model.source_schema
target_schema = self.archive_model.target_schema
source_table = self.archive_model.source_table
target_table = self.archive_model.target_table
unique_key = self.archive_model.unique_key
updated_at = self.archive_model.updated_at

self.schema.create_schema(target_schema)

source_columns = self.schema.get_columns_in_table(source_schema, source_table)

if len(source_columns) == 0:
raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema, source_table))

# create archive table if not exists! TODO: Sort & Dist keys! Hmmmm

extra_cols = [
("valid_from", "timestamp"),
("valid_to", "timestamp"),
("scd_id","text"),
("dbt_updated_at","timestamp")
]

dest_columns = source_columns + extra_cols
self.schema.create_table(target_schema, target_table, dest_columns, sort=updated_at, dist=unique_key)

env = jinja2.Environment()

ctx = {
"columns" : source_columns,
"updated_at" : updated_at,
"unique_key" : unique_key,
"source_schema" : source_schema,
"source_table" : source_table,
"target_schema" : target_schema,
"target_table" : target_table
}

base_query = dbt.templates.SCDArchiveTemplate
template = env.from_string(base_query, globals=ctx)
rendered = template.render(ctx)

return rendered

def runtime_compile(self, compiled_model):
context = self.context.copy()
context.update(model.context())
model.compile(context)

40 changes: 35 additions & 5 deletions dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects, split_path, This, Var, compiler_error
from dbt.linker import Linker
import dbt.targets
import dbt.templates
import time
import sqlparse

CompilableEntities = ["models", "tests", "archives", "analyses"]

class Compiler(object):
def __init__(self, project, create_template_class):
self.project = project
Expand Down Expand Up @@ -38,6 +41,8 @@ def model_sources(self, this_project, own_project=None):
return Source(this_project, own_project=own_project).get_models(paths, self.create_template)
elif self.create_template.label == 'test':
return Source(this_project, own_project=own_project).get_test_models(paths, self.create_template)
elif self.create_template.label == 'archive':
return []
else:
raise RuntimeError("unexpected create template type: '{}'".format(self.create_template.label))

Expand All @@ -47,6 +52,10 @@ def get_macros(self, this_project, own_project=None):
paths = own_project.get('macro-paths', [])
return Source(this_project, own_project=own_project).get_macros(paths)

def get_archives(self, project):
archive_template = dbt.templates.ArchiveInsertTemplate()
return Source(project, own_project=project).get_archives(archive_template)

def project_schemas(self):
source_paths = self.project.get('source-paths', [])
return Source(self.project).get_schemas(source_paths)
Expand Down Expand Up @@ -192,8 +201,8 @@ def compile_model(self, linker, model, models):

return rendered

def write_graph_file(self, linker):
filename = 'graph-{}.yml'.format(self.create_template.label)
def write_graph_file(self, linker, label):
filename = 'graph-{}.yml'.format(label)
graph_path = os.path.join(self.project['target-path'], filename)
linker.write_graph(graph_path)

Expand Down Expand Up @@ -329,6 +338,19 @@ def do_gen(ctx):
return macros
return do_gen

def compile_archives(self):
linker = Linker()
all_archives = self.get_archives(self.project)

for archive in all_archives:
sql = archive.compile()
fqn = tuple(archive.fqn)
linker.update_node_data(fqn, archive.serialize())
self.__write(archive.build_path(), sql)

self.write_graph_file(linker, 'archive')
return all_archives

def compile(self, dry=False):
linker = Linker()

Expand All @@ -350,11 +372,19 @@ def compile(self, dry=False):

self.validate_models_unique(compiled_models)
self.validate_models_unique(written_schema_tests)
self.write_graph_file(linker)
self.write_graph_file(linker, self.create_template.label)

if self.create_template.label != 'test':
if self.create_template.label not in ['test', 'archive']:
written_analyses = self.compile_analyses(linker, compiled_models)
else:
written_analyses = []

return len(written_models), len(written_schema_tests), len(written_analyses)

compiled_archives = self.compile_archives()

return {
"models": len(written_models),
"tests" : len(written_schema_tests),
"archives": len(compiled_archives),
"analyses" : len(written_analyses)
}
22 changes: 22 additions & 0 deletions dbt/compiled_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class CompiledModel(object):
def __init__(self, fqn, data):
self.fqn = fqn
self.data = data
self.nice_name = ".".join(fqn)

# these are set just before the models are executed
self.tmp_drop_type = None
Expand All @@ -23,6 +24,9 @@ def hashed_name(self):
fqn_string = ".".join(self.fqn)
return hashlib.md5(fqn_string.encode('utf-8')).hexdigest()

def context(self):
return self.data

def hashed_contents(self):
return hashlib.md5(self.contents.encode('utf-8')).hexdigest()

Expand Down Expand Up @@ -110,13 +114,31 @@ def prepare(self, existing, target):
def __repr__(self):
return "<CompiledModel {}.{}: {}>".format(self.data['project_name'], self.name, self.data['build_path'])

class CompiledArchive(CompiledModel):
def __init__(self, fqn, data):
super(CompiledArchive, self).__init__(fqn, data)

def should_rename(self):
return False

def should_execute(self):
return True

def prepare(self, existing, target):
self.target = target

def __repr__(self):
return "<CompiledArchive {}.{}: {}>".format(self.data['project_name'], self.name, self.data['build_path'])

def make_compiled_model(fqn, data):
run_type = data['dbt_run_type']

if run_type in ['run', 'dry-run']:
return CompiledModel(fqn, data)
elif run_type == 'test':
return CompiledTest(fqn, data)
elif run_type == 'archive':
return CompiledArchive(fqn, data)
else:
raise RuntimeError("invalid run_type given: {}".format(run_type))

Expand Down
4 changes: 4 additions & 0 deletions dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import dbt.task.init as init_task
import dbt.task.seed as seed_task
import dbt.task.test as test_task
import dbt.task.archive as archive_task
import dbt.tracking


Expand Down Expand Up @@ -71,6 +72,9 @@ def handle(args):
sub = subs.add_parser('deps', parents=[base_subparser])
sub.set_defaults(cls=deps_task.DepsTask, which='deps')

sub = subs.add_parser('archive', parents=[base_subparser])
sub.set_defaults(cls=archive_task.ArchiveTask, which='archive')

sub = subs.add_parser('run', parents=[base_subparser])
sub.add_argument('--dry', action='store_true', help="'dry run' models")
sub.add_argument('--models', required=False, nargs='+', help="Specify the models to run. All models depending on these models will also be run")
Expand Down
66 changes: 66 additions & 0 deletions dbt/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dbt.utils import split_path
import dbt.schema_tester
import dbt.project
import dbt.archival
from dbt.utils import This, deep_merge, DBTConfigKeys, compiler_error

class SourceConfig(object):
Expand Down Expand Up @@ -578,3 +579,68 @@ def __repr__(self):
return "<Macro {}.{}: {}>".format(self.project['name'], self.name, self.filepath)


class ArchiveModel(DBTSource):
dbt_run_type = 'archive'

def __init__(self, project, create_template, archive_data):

self.create_template = create_template

self.validate(archive_data)

self.source_schema = archive_data['source_schema']
self.target_schema = archive_data['target_schema']
self.source_table = archive_data['source_table']
self.target_table = archive_data['target_table']
self.unique_key = archive_data['unique_key']
self.updated_at = archive_data['updated_at']

target_dir = self.create_template.label
rel_filepath = os.path.join(self.target_schema, self.target_table)

super(ArchiveModel, self).__init__(project, target_dir, rel_filepath, project)

def validate(self, data):
required = [
'source_schema',
'target_schema',
'source_table',
'target_table',
'unique_key',
'updated_at',
]

for key in required:
if data.get(key, None) is None:
raise RuntimeError("Invalid archive config: missing required field '{}'".format(key))

def serialize(self):
data = DBTSource.serialize(self).copy()

serialized = {
"source_schema" : self.source_schema,
"target_schema" : self.target_schema,
"source_table" : self.source_table,
"target_table" : self.target_table,
"unique_key" : self.unique_key,
"updated_at" : self.updated_at
}

data.update(serialized)
return data

def compile(self):
archival = dbt.archival.Archival(self.project, self)
query = archival.compile()

sql = self.create_template.wrap(self.target_schema, self.target_table, query, self.unique_key)
return sql

def build_path(self):
build_dir = self.create_template.label
filename = "{}.sql".format(self.name)
path_parts = [build_dir] + self.fqn[:-1] + [filename]
return os.path.join(*path_parts)

def __repr__(self):
return "<ArchiveModel {} --> {} unique:{} updated_at:{}>".format(self.source_table, self.target_table, self.unique_key, self.updated_at)
50 changes: 47 additions & 3 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,42 @@ def execute(self, schema, target, model):

return row[0]

class ArchiveRunner(BaseRunner):
run_type = 'archive'

def pre_run_msg(self, model):
print_vars = {
"schema": model.target.schema,
"model_name": model.name,
}

output = "START archive table {schema}.{model_name} ".format(**print_vars)
return output

def post_run_msg(self, result):
model = result.model
print_vars = {
"schema": model.target.schema,
"model_name": model.name,
"info": "ERROR archiving" if result.errored else "OK created"
}

output = "{info} table {schema}.{model_name} ".format(**print_vars)
return output

def pre_run_all_msg(self, models):
return "Archiving {} tables".format(len(models))

def post_run_all_msg(self, results):
return "Finished archiving {} tables".format(len(results))

def status(self, result):
return result.status

def execute(self, schema, target, model):
status = schema.execute_and_handle_permissions(model.compiled_contents, model.name)
return status

class RunManager(object):
def __init__(self, project, target_path, graph_type, threads):
self.logger = logging.getLogger(__name__)
Expand All @@ -218,7 +254,9 @@ def __init__(self, project, target_path, graph_type, threads):

self.context = {
"run_started_at": datetime.now(),
"invocation_id": dbt.tracking.invocation_id,
"invocation_id" : dbt.tracking.invocation_id,
"get_columns_in_table" : self.schema.get_columns_in_table,
"get_missing_columns" : self.schema.get_missing_columns,
}


Expand Down Expand Up @@ -301,7 +339,7 @@ def execute_models(self, runner, model_dependency_list, on_failure):

num_models = len(flat_models)
if num_models == 0:
print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path))
print("WARNING: Nothing to do. Try checking your model configs and running `dbt compile`".format(self.target_path))
return []

num_threads = self.target.threads
Expand Down Expand Up @@ -391,7 +429,9 @@ def run_from_graph(self, runner, limit_to):

for m in relevant_compiled_models:
if m.should_execute():
m.compile(self.context)
context = self.context.copy()
context.update(m.context())
m.compile(context)

schema_name = self.target.schema

Expand Down Expand Up @@ -443,3 +483,7 @@ def dry_run(self, limit_to=None):
runner = DryRunner()
return self.safe_run_from_graph(runner, limit_to)

def run_archive(self):
runner = ArchiveRunner()
return self.safe_run_from_graph(runner, None)

Loading

0 comments on commit 7411407

Please sign in to comment.