From 4131c06e12d83fdea99115ddde1722923a47baf0 Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Mon, 9 Apr 2018 15:33:35 -0400 Subject: [PATCH] fix statements in on run start hooks (#693) * fix statements in on run start hooks * do nothing on empty hook * pr feedback --- dbt/node_runners.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/dbt/node_runners.py b/dbt/node_runners.py index 8b6f7b0420b..0e142bcdc93 100644 --- a/dbt/node_runners.py +++ b/dbt/node_runners.py @@ -14,7 +14,6 @@ import dbt.templates import dbt.writer -import os import time @@ -292,41 +291,48 @@ def run_hooks(cls, project, adapter, flat_graph, hook_type): nodes = flat_graph.get('nodes', {}).values() hooks = get_nodes_by_tags(nodes, {hook_type}, NodeType.Operation) - # This will clear out an open transaction if there is one. - # on-run-* hooks should run outside of a transaction. This happens b/c - # psycopg2 automatically begins a transaction when a connection is - # created. TODO : Move transaction logic out of here, and implement - # a for-loop over these sql statements in jinja-land. Also, consider - # configuring psycopg2 (and other adapters?) to ensure that a - # transaction is only created if dbt initiates it. - conn_name = adapter.clear_transaction(profile) - compiled_hooks = [] for hook in hooks: + model_name = hook.get('name') + + # This will clear out an open transaction if there is one. + # on-run-* hooks should run outside of a transaction. This happens + # b/c psycopg2 automatically begins a transaction when a connection + # is created. TODO : Move transaction logic out of here, and + # implement a for-loop over these sql statements in jinja-land. + # Also, consider configuring psycopg2 (and other adapters?) to + # ensure that a transaction is only created if dbt initiates it. + adapter.clear_transaction(profile, model_name) compiled = cls._compile_node(adapter, project, hook, flat_graph) - model_name = compiled.get('name') statement = compiled['wrapped_sql'] hook_index = hook.get('index', len(hooks)) hook_dict = dbt.hooks.get_hook_dict(statement, index=hook_index) compiled_hooks.append(hook_dict) + adapter.release_connection(profile, model_name) ordered_hooks = sorted(compiled_hooks, key=lambda h: h.get('index', 0)) for hook in ordered_hooks: + model_name = hook.get('name') + if dbt.flags.STRICT_MODE: dbt.contracts.graph.parsed.validate_hook(hook) sql = hook.get('sql', '') - adapter.execute_one(profile, sql, model_name=conn_name, - auto_begin=False) - adapter.release_connection(profile, conn_name) + + if len(sql.strip()) > 0: + adapter.execute_one(profile, sql, model_name=model_name, + auto_begin=False) + + adapter.release_connection(profile, model_name) @classmethod def safe_run_hooks(cls, project, adapter, flat_graph, hook_type): try: cls.run_hooks(project, adapter, flat_graph, hook_type) - except dbt.exceptions.RuntimeException as e: + + except dbt.exceptions.RuntimeException: logger.info("Database error while running {}".format(hook_type)) raise