Skip to content

Commit

Permalink
fix statements in on run start hooks (#693)
Browse files Browse the repository at this point in the history
* fix statements in on run start hooks

* do nothing on empty hook

* pr feedback
  • Loading branch information
cmcarthur authored Apr 9, 2018
1 parent 848ff6a commit 4131c06
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import dbt.templates
import dbt.writer

import os
import time


Expand Down Expand Up @@ -292,41 +291,48 @@ def run_hooks(cls, project, adapter, flat_graph, hook_type):
nodes = flat_graph.get('nodes', {}).values()
hooks = get_nodes_by_tags(nodes, {hook_type}, NodeType.Operation)

# This will clear out an open transaction if there is one.
# on-run-* hooks should run outside of a transaction. This happens b/c
# psycopg2 automatically begins a transaction when a connection is
# created. TODO : Move transaction logic out of here, and implement
# a for-loop over these sql statements in jinja-land. Also, consider
# configuring psycopg2 (and other adapters?) to ensure that a
# transaction is only created if dbt initiates it.
conn_name = adapter.clear_transaction(profile)

compiled_hooks = []
for hook in hooks:
model_name = hook.get('name')

# This will clear out an open transaction if there is one.
# on-run-* hooks should run outside of a transaction. This happens
# b/c psycopg2 automatically begins a transaction when a connection
# is created. TODO : Move transaction logic out of here, and
# implement a for-loop over these sql statements in jinja-land.
# Also, consider configuring psycopg2 (and other adapters?) to
# ensure that a transaction is only created if dbt initiates it.
adapter.clear_transaction(profile, model_name)
compiled = cls._compile_node(adapter, project, hook, flat_graph)
model_name = compiled.get('name')
statement = compiled['wrapped_sql']

hook_index = hook.get('index', len(hooks))
hook_dict = dbt.hooks.get_hook_dict(statement, index=hook_index)
compiled_hooks.append(hook_dict)
adapter.release_connection(profile, model_name)

ordered_hooks = sorted(compiled_hooks, key=lambda h: h.get('index', 0))

for hook in ordered_hooks:
model_name = hook.get('name')

if dbt.flags.STRICT_MODE:
dbt.contracts.graph.parsed.validate_hook(hook)

sql = hook.get('sql', '')
adapter.execute_one(profile, sql, model_name=conn_name,
auto_begin=False)
adapter.release_connection(profile, conn_name)

if len(sql.strip()) > 0:
adapter.execute_one(profile, sql, model_name=model_name,
auto_begin=False)

adapter.release_connection(profile, model_name)

@classmethod
def safe_run_hooks(cls, project, adapter, flat_graph, hook_type):
try:
cls.run_hooks(project, adapter, flat_graph, hook_type)
except dbt.exceptions.RuntimeException as e:

except dbt.exceptions.RuntimeException:
logger.info("Database error while running {}".format(hook_type))
raise

Expand Down

0 comments on commit 4131c06

Please sign in to comment.