Skip to content

Commit

Permalink
properly track hook running (#6059)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenyuLInx authored Nov 7, 2022
1 parent 6c76137 commit 930bd35
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 18 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20221107-095314.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Allow hooks to fail without halting execution flow
time: 2022-11-07T09:53:14.340257-06:00
custom:
Author: ChenyuLInx
Issue: "5625"
PR: "6059"
4 changes: 3 additions & 1 deletion core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ def from_execution_results(
generated_at: datetime,
args: Dict,
):
processed_results = [process_run_result(result) for result in results]
processed_results = [
process_run_result(result) for result in results if isinstance(result, RunResult)
]
meta = RunResultsMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
Expand Down
21 changes: 14 additions & 7 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.model_config import Hook
from dbt.contracts.graph.parsed import ParsedHookNode
from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus
from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus, BaseResult
from dbt.exceptions import (
CompilationException,
InternalException,
Expand Down Expand Up @@ -394,12 +394,22 @@ def safe_run_hooks(
) -> None:
try:
self.run_hooks(adapter, hook_type, extra_context)
except RuntimeException:
except RuntimeException as exc:
fire_event(DatabaseErrorRunningHook(hook_type=hook_type.value))
raise
self.node_results.append(
BaseResult(
status=RunStatus.Error,
thread_id="main",
timing=[],
message=f"{hook_type.value} failed, error:\n {exc.msg}",
adapter_response=exc.msg,
execution_time=0,
failures=1,
)
)

def print_results_line(self, results, execution_time):
nodes = [r.node for r in results] + self.ran_hooks
nodes = [r.node for r in results if hasattr(r, "node")] + self.ran_hooks
stat_line = get_counts(nodes)

execution = ""
Expand Down Expand Up @@ -444,9 +454,6 @@ def after_run(self, adapter, results):
with adapter.connection_named("master"):
self.safe_run_hooks(adapter, RunHookType.End, extras)

def after_hooks(self, adapter, results, elapsed):
self.print_results_line(results, elapsed)

def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise InternalException("manifest and graph must be set to get perform node selection")
Expand Down
17 changes: 7 additions & 10 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,34 +413,31 @@ def populate_adapter_cache(self, adapter, required_schemas: Set[BaseRelation] =
{"adapter_cache_construction_elapsed": cache_populate_time}
)

def before_hooks(self, adapter):
pass

def before_run(self, adapter, selected_uids: AbstractSet[str]):
with adapter.connection_named("master"):
self.populate_adapter_cache(adapter)

def after_run(self, adapter, results):
pass

def after_hooks(self, adapter, results, elapsed):
def print_results_line(self, node_results, elapsed):
pass

def execute_with_hooks(self, selected_uids: AbstractSet[str]):
adapter = get_adapter(self.config)
started = time.time()
try:
self.before_hooks(adapter)
started = time.time()
self.before_run(adapter, selected_uids)
res = self.execute_nodes()
self.after_run(adapter, res)
elapsed = time.time() - started
self.after_hooks(adapter, res, elapsed)

finally:
adapter.cleanup_connections()
elapsed = time.time() - started
self.print_results_line(self.node_results, elapsed)
result = self.get_result(
results=self.node_results, elapsed_time=elapsed, generated_at=datetime.utcnow()
)

result = self.get_result(results=res, elapsed_time=elapsed, generated_at=datetime.utcnow())
return result

def write_result(self, result):
Expand Down

0 comments on commit 930bd35

Please sign in to comment.