-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathnode_runners.py
116 lines (95 loc) · 3.71 KB
/
node_runners.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from abc import abstractmethod
from datetime import datetime
from typing import Generic, TypeVar
import dbt.exceptions
from dbt_rpc.contracts.rpc import (
RemoteCompileResult,
RemoteCompileResultMixin,
RemoteRunResult,
ResultTable,
)
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.task.compile import CompileRunner
from dbt_rpc.rpc.error import dbt_error, RPCException, server_error
RPCSQLResult = TypeVar("RPCSQLResult", bound=RemoteCompileResultMixin)
class GenericRPCRunner(CompileRunner, Generic[RPCSQLResult]):
def __init__(self, config, adapter, node, node_index, num_nodes):
CompileRunner.__init__(self, config, adapter, node, node_index, num_nodes)
def handle_exception(self, e, ctx):
logger.debug("Got an exception: {}".format(e), exc_info=True)
if isinstance(e, dbt.exceptions.Exception):
if isinstance(e, dbt.exceptions.RuntimeException):
e.add_node(ctx.node)
return dbt_error(e)
elif isinstance(e, RPCException):
return e
else:
return server_error(e)
def before_execute(self):
pass
def after_execute(self, result):
pass
def compile(self, manifest):
if not self.node.config.enabled:
raise dbt.exceptions.raise_compiler_error(
"Trying to compile a node that is disabled"
)
compiler = self.adapter.get_compiler()
return compiler.compile_node(self.node, manifest, {}, write=False)
@abstractmethod
def execute(self, compiled_node, manifest) -> RPCSQLResult:
pass
@abstractmethod
def from_run_result(self, result, start_time, timing_info) -> RPCSQLResult:
pass
def error_result(self, node, error, start_time, timing_info):
raise error
def ephemeral_result(self, node, start_time, timing_info):
raise dbt.exceptions.NotImplementedException(
"cannot execute ephemeral nodes remotely!"
)
class RPCCompileRunner(GenericRPCRunner[RemoteCompileResult]):
def execute(self, compiled_node, manifest) -> RemoteCompileResult:
return RemoteCompileResult(
raw_sql=compiled_node.raw_sql,
compiled_sql=compiled_node.compiled_sql,
node=compiled_node,
timing=[], # this will get added later
logs=[],
generated_at=datetime.utcnow(),
)
def from_run_result(self, result, start_time, timing_info) -> RemoteCompileResult:
return RemoteCompileResult(
raw_sql=result.raw_sql,
compiled_sql=result.compiled_sql,
node=result.node,
timing=timing_info,
logs=[],
generated_at=datetime.utcnow(),
)
class RPCExecuteRunner(GenericRPCRunner[RemoteRunResult]):
def execute(self, compiled_node, manifest) -> RemoteRunResult:
_, execute_result = self.adapter.execute(compiled_node.compiled_sql, fetch=True)
table = ResultTable(
column_names=list(execute_result.column_names),
rows=[list(row) for row in execute_result],
)
return RemoteRunResult(
raw_sql=compiled_node.raw_sql,
compiled_sql=compiled_node.compiled_sql,
node=compiled_node,
table=table,
timing=[],
logs=[],
generated_at=datetime.utcnow(),
)
def from_run_result(self, result, start_time, timing_info) -> RemoteRunResult:
return RemoteRunResult(
raw_sql=result.raw_sql,
compiled_sql=result.compiled_sql,
node=result.node,
table=result.table,
timing=timing_info,
logs=[],
generated_at=datetime.utcnow(),
)