From ed294b04da10a35e24b047ee8ee6abb574436565 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Wed, 12 Oct 2022 17:32:19 -0700 Subject: [PATCH 1/2] properly track hook running --- core/dbt/contracts/results.py | 4 +++- core/dbt/task/run.py | 21 ++++++++++++++------- core/dbt/task/runnable.py | 18 +++++++----------- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index 0288b732b75..bb3b02d1048 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -208,7 +208,9 @@ def from_execution_results( generated_at: datetime, args: Dict, ): - processed_results = [process_run_result(result) for result in results] + processed_results = [ + process_run_result(result) for result in results if isinstance(result, RunResult) + ] meta = RunResultsMetadata( dbt_schema_version=str(cls.dbt_schema_version), generated_at=generated_at, diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index c2ea9169ec5..10df58a2a6b 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -20,7 +20,7 @@ from dbt.contracts.graph.compiled import CompileResultNode from dbt.contracts.graph.model_config import Hook from dbt.contracts.graph.parsed import ParsedHookNode -from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus +from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus, BaseResult from dbt.exceptions import ( CompilationException, InternalException, @@ -400,12 +400,22 @@ def safe_run_hooks( ) -> None: try: self.run_hooks(adapter, hook_type, extra_context) - except RuntimeException: + except RuntimeException as exc: fire_event(DatabaseErrorRunning(hook_type=hook_type.value)) - raise + self.node_results.append( + BaseResult( + status=RunStatus.Error, + thread_id="main", + timing=[], + message=f"{hook_type.value} failed, error:\n {exc.msg}", + adapter_response=exc.msg, + execution_time=0, + failures=1, + ) + ) def print_results_line(self, results, execution_time): - nodes = [r.node for r in results] + self.ran_hooks + nodes = [r.node for r in results if hasattr(r, "node")] + self.ran_hooks stat_line = get_counts(nodes) execution = "" @@ -450,9 +460,6 @@ def after_run(self, adapter, results): with adapter.connection_named("master"): self.safe_run_hooks(adapter, RunHookType.End, extras) - def after_hooks(self, adapter, results, elapsed): - self.print_results_line(results, elapsed) - def get_node_selector(self) -> ResourceTypeSelector: if self.manifest is None or self.graph is None: raise InternalException("manifest and graph must be set to get perform node selection") diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index fdbf1bf1d89..19ba0d221de 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -413,9 +413,6 @@ def populate_adapter_cache(self, adapter, required_schemas: Set[BaseRelation] = {"adapter_cache_construction_elapsed": cache_populate_time} ) - def before_hooks(self, adapter): - pass - def before_run(self, adapter, selected_uids: AbstractSet[str]): with adapter.connection_named("master"): self.populate_adapter_cache(adapter) @@ -423,24 +420,24 @@ def before_run(self, adapter, selected_uids: AbstractSet[str]): def after_run(self, adapter, results): pass - def after_hooks(self, adapter, results, elapsed): + def print_results_line(self, node_results, elapsed): pass def execute_with_hooks(self, selected_uids: AbstractSet[str]): adapter = get_adapter(self.config) + started = time.time() try: - self.before_hooks(adapter) - started = time.time() self.before_run(adapter, selected_uids) res = self.execute_nodes() self.after_run(adapter, res) - elapsed = time.time() - started - self.after_hooks(adapter, res, elapsed) - finally: adapter.cleanup_connections() + elapsed = time.time() - started + self.print_results_line(self.node_results, elapsed) + result = self.get_result( + results=self.node_results, elapsed_time=elapsed, generated_at=datetime.utcnow() + ) - result = self.get_result(results=res, elapsed_time=elapsed, generated_at=datetime.utcnow()) return result def write_result(self, result): @@ -470,7 +467,6 @@ def run(self): fire_event(EmptyLine()) selected_uids = frozenset(n.unique_id for n in self._flattened_nodes) result = self.execute_with_hooks(selected_uids) - if flags.WRITE_JSON: self.write_manifest() self.write_result(result) From b98ea19014c808d63c92eef951dd7b4c82e78e80 Mon Sep 17 00:00:00 2001 From: Stu Kilgore Date: Mon, 7 Nov 2022 09:53:16 -0600 Subject: [PATCH 2/2] Add changie entry --- .changes/unreleased/Fixes-20221107-095314.yaml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changes/unreleased/Fixes-20221107-095314.yaml diff --git a/.changes/unreleased/Fixes-20221107-095314.yaml b/.changes/unreleased/Fixes-20221107-095314.yaml new file mode 100644 index 00000000000..f3763b7d039 --- /dev/null +++ b/.changes/unreleased/Fixes-20221107-095314.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: Allow hooks to fail without halting execution flow +time: 2022-11-07T09:53:14.340257-06:00 +custom: + Author: ChenyuLInx + Issue: "5625" + PR: "6059"