Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

properly track hook running #6059

Merged
merged 3 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20221107-095314.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Allow hooks to fail without halting execution flow
time: 2022-11-07T09:53:14.340257-06:00
custom:
Author: ChenyuLInx
Issue: "5625"
PR: "6059"
4 changes: 3 additions & 1 deletion core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ def from_execution_results(
generated_at: datetime,
args: Dict,
):
processed_results = [process_run_result(result) for result in results]
processed_results = [
process_run_result(result) for result in results if isinstance(result, RunResult)
]
meta = RunResultsMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
Expand Down
21 changes: 14 additions & 7 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from dbt.contracts.graph.compiled import CompileResultNode
from dbt.contracts.graph.model_config import Hook
from dbt.contracts.graph.parsed import ParsedHookNode
from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus
from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus, BaseResult
from dbt.exceptions import (
CompilationException,
InternalException,
Expand Down Expand Up @@ -394,12 +394,22 @@ def safe_run_hooks(
) -> None:
try:
self.run_hooks(adapter, hook_type, extra_context)
except RuntimeException:
except RuntimeException as exc:
fire_event(DatabaseErrorRunningHook(hook_type=hook_type.value))
raise
self.node_results.append(
BaseResult(
status=RunStatus.Error,
thread_id="main",
timing=[],
message=f"{hook_type.value} failed, error:\n {exc.msg}",
adapter_response=exc.msg,
execution_time=0,
failures=1,
)
)

def print_results_line(self, results, execution_time):
nodes = [r.node for r in results] + self.ran_hooks
nodes = [r.node for r in results if hasattr(r, "node")] + self.ran_hooks
stat_line = get_counts(nodes)

execution = ""
Expand Down Expand Up @@ -444,9 +454,6 @@ def after_run(self, adapter, results):
with adapter.connection_named("master"):
self.safe_run_hooks(adapter, RunHookType.End, extras)

def after_hooks(self, adapter, results, elapsed):
self.print_results_line(results, elapsed)

def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise InternalException("manifest and graph must be set to get perform node selection")
Expand Down
17 changes: 7 additions & 10 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,34 +413,31 @@ def populate_adapter_cache(self, adapter, required_schemas: Set[BaseRelation] =
{"adapter_cache_construction_elapsed": cache_populate_time}
)

def before_hooks(self, adapter):
pass

def before_run(self, adapter, selected_uids: AbstractSet[str]):
with adapter.connection_named("master"):
self.populate_adapter_cache(adapter)

def after_run(self, adapter, results):
pass

def after_hooks(self, adapter, results, elapsed):
def print_results_line(self, node_results, elapsed):
pass

def execute_with_hooks(self, selected_uids: AbstractSet[str]):
adapter = get_adapter(self.config)
started = time.time()
try:
self.before_hooks(adapter)
started = time.time()
self.before_run(adapter, selected_uids)
res = self.execute_nodes()
self.after_run(adapter, res)
elapsed = time.time() - started
self.after_hooks(adapter, res, elapsed)

finally:
adapter.cleanup_connections()
elapsed = time.time() - started
self.print_results_line(self.node_results, elapsed)
result = self.get_result(
results=self.node_results, elapsed_time=elapsed, generated_at=datetime.utcnow()
)

result = self.get_result(results=res, elapsed_time=elapsed, generated_at=datetime.utcnow())
return result

def write_result(self, result):
Expand Down