-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
show.py
110 lines (90 loc) · 3.71 KB
/
show.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import io
import threading
import time
from dbt.contracts.graph.nodes import SeedNode
from dbt.contracts.results import RunResult, RunStatus
from dbt.events.base_types import EventLevel
from dbt.events.functions import fire_event
from dbt.events.types import ShowNode, Note
from dbt.exceptions import DbtRuntimeError
from dbt.task.compile import CompileTask, CompileRunner
from dbt.task.seed import SeedRunner
class ShowRunner(CompileRunner):
def __init__(self, config, adapter, node, node_index, num_nodes):
super().__init__(config, adapter, node, node_index, num_nodes)
self.run_ephemeral_models = True
def execute(self, compiled_node, manifest):
start_time = time.time()
# Allow passing in -1 (or any negative number) to get all rows
limit = None if self.config.args.limit < 0 else self.config.args.limit
if "sql_header" in compiled_node.unrendered_config:
compiled_node.compiled_code = (
compiled_node.unrendered_config["sql_header"] + compiled_node.compiled_code
)
adapter_response, execute_result = self.adapter.execute(
compiled_node.compiled_code, fetch=True, limit=limit
)
end_time = time.time()
return RunResult(
node=compiled_node,
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=end_time - start_time,
message=None,
adapter_response=adapter_response.to_dict(),
agate_table=execute_result,
failures=None,
)
class ShowTask(CompileTask):
def _runtime_initialize(self):
if not (self.args.select or getattr(self.args, "inline", None)):
raise DbtRuntimeError("Either --select or --inline must be passed to show")
super()._runtime_initialize()
def get_runner_type(self, node):
if isinstance(node, SeedNode):
return SeedRunner
else:
return ShowRunner
def task_end_messages(self, results):
is_inline = bool(getattr(self.args, "inline", None))
if is_inline:
matched_results = [result for result in results if result.node.name == "inline_query"]
else:
matched_results = []
for result in results:
if result.node.name in self.selection_arg[0]:
matched_results.append(result)
else:
fire_event(
Note(msg=f"Excluded node '{result.node.name}' from results"),
EventLevel.DEBUG,
)
for result in matched_results:
table = result.agate_table
# Hack to get Agate table output as string
output = io.StringIO()
if self.args.output == "json":
table.to_json(path=output)
else:
table.print_table(output=output, max_rows=None)
node_name = result.node.name
if hasattr(result.node, "version") and result.node.version:
node_name += f".v{result.node.version}"
fire_event(
ShowNode(
node_name=node_name,
preview=output.getvalue(),
is_inline=is_inline,
output_format=self.args.output,
unique_id=result.node.unique_id,
)
)
def _handle_result(self, result):
super()._handle_result(result)
if (
result.node.is_ephemeral_model
and type(self) is ShowTask
and (self.args.select or getattr(self.args, "inline", None))
):
self.node_results.append(result)