-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
build.py
220 lines (188 loc) · 9 KB
/
build.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import threading
from typing import Dict, List, Optional, Set, Type
from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.cli.flags import Flags
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogNodeNoOpResult
from dbt.exceptions import DbtInternalError
from dbt.graph import Graph, GraphQueue, ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt_common.events.functions import fire_event
from .run import ModelRunner as run_model_runner
from .run import RunTask
from .seed import SeedRunner as seed_runner
from .snapshot import SnapshotRunner as snapshot_model_runner
from .test import TestRunner as test_runner
class SavedQueryRunner(BaseRunner):
# Stub. No-op Runner for Saved Queries, which require MetricFlow for execution.
@property
def description(self) -> str:
return f"saved query {self.node.name}"
def before_execute(self) -> None:
pass
def compile(self, manifest: Manifest):
return self.node
def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info,
)
)
def execute(self, compiled_node, manifest):
# no-op
return RunResult(
node=compiled_node,
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message="NO-OP",
adapter_response={},
failures=0,
agate_table=None,
)
class BuildTask(RunTask):
"""The Build task processes all assets of a given process and attempts to
'build' them in an opinionated fashion. Every resource type outlined in
RUNNER_MAP will be processed by the mapped runner class.
I.E. a resource of type Model is handled by the ModelRunner which is
imported as run_model_runner."""
MARK_DEPENDENT_ERRORS_STATUSES = [NodeStatus.Error, NodeStatus.Fail, NodeStatus.Skipped]
RUNNER_MAP = {
NodeType.Model: run_model_runner,
NodeType.Snapshot: snapshot_model_runner,
NodeType.Seed: seed_runner,
NodeType.Test: test_runner,
NodeType.Unit: test_runner,
NodeType.SavedQuery: SavedQueryRunner,
}
ALL_RESOURCE_VALUES = frozenset({x for x in RUNNER_MAP.keys()})
def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> None:
super().__init__(args, config, manifest)
self.selected_unit_tests: Set = set()
self.model_to_unit_test_map: Dict[str, List] = {}
def resource_types(self, no_unit_tests: bool = False) -> List[NodeType]:
resource_types = resource_types_from_args(
self.args, set(self.ALL_RESOURCE_VALUES), set(self.ALL_RESOURCE_VALUES)
)
# First we get selected_nodes including unit tests, then without,
# and do a set difference.
if no_unit_tests is True and NodeType.Unit in resource_types:
resource_types.remove(NodeType.Unit)
return list(resource_types)
# overrides get_graph_queue in runnable.py
def get_graph_queue(self) -> GraphQueue:
# Following uses self.selection_arg and self.exclusion_arg
spec = self.get_selection_spec()
# selector including unit tests
full_selector = self.get_node_selector(no_unit_tests=False)
# selected node unique_ids with unit_tests
full_selected_nodes = full_selector.get_selected(spec)
# This selector removes the unit_tests from the selector
selector_wo_unit_tests = self.get_node_selector(no_unit_tests=True)
# selected node unique_ids without unit_tests
selected_nodes_wo_unit_tests = selector_wo_unit_tests.get_selected(spec)
# Get the difference in the sets of nodes with and without unit tests and
# save it
selected_unit_tests = full_selected_nodes - selected_nodes_wo_unit_tests
self.selected_unit_tests = selected_unit_tests
self.build_model_to_unit_test_map(selected_unit_tests)
# get_graph_queue in the selector will remove NodeTypes not specified
# in the node_selector (filter_selection).
return selector_wo_unit_tests.get_graph_queue(spec)
# overrides handle_job_queue in runnable.py
def handle_job_queue(self, pool, callback):
if self.run_count == 0:
self.num_nodes = self.num_nodes + len(self.selected_unit_tests)
node = self.job_queue.get()
if (
node.resource_type == NodeType.Model
and self.model_to_unit_test_map
and node.unique_id in self.model_to_unit_test_map
):
self.handle_model_with_unit_tests_node(node, pool, callback)
else:
self.handle_job_queue_node(node, pool, callback)
def handle_model_with_unit_tests_node(self, node, pool, callback):
self._raise_set_error()
args = [node]
if self.config.args.single_threaded:
callback(self.call_model_and_unit_tests_runner(*args))
else:
pool.apply_async(self.call_model_and_unit_tests_runner, args=args, callback=callback)
def call_model_and_unit_tests_runner(self, node) -> RunResult:
assert self.manifest
for unit_test_unique_id in self.model_to_unit_test_map[node.unique_id]:
unit_test_node = self.manifest.unit_tests[unit_test_unique_id]
unit_test_runner = self.get_runner(unit_test_node)
# If the model is marked skip, also skip the unit tests
if node.unique_id in self._skipped_children:
# cause is only for ephemeral nodes
unit_test_runner.do_skip(cause=None)
result = self.call_runner(unit_test_runner)
self._handle_result(result)
if result.status in self.MARK_DEPENDENT_ERRORS_STATUSES:
# The _skipped_children dictionary can contain a run_result for ephemeral nodes,
# but that should never be the case here.
self._skipped_children[node.unique_id] = None
runner = self.get_runner(node)
if runner.node.unique_id in self._skipped_children:
cause = self._skipped_children.pop(runner.node.unique_id)
runner.do_skip(cause=cause)
return self.call_runner(runner)
# handle non-model-plus-unit-tests nodes
def handle_job_queue_node(self, node, pool, callback):
self._raise_set_error()
runner = self.get_runner(node)
# we finally know what we're running! Make sure we haven't decided
# to skip it due to upstream failures
if runner.node.unique_id in self._skipped_children:
cause = self._skipped_children.pop(runner.node.unique_id)
runner.do_skip(cause=cause)
args = [runner]
if self.config.args.single_threaded:
callback(self.call_runner(*args))
else:
pool.apply_async(self.call_runner, args=args, callback=callback)
# Make a map of model unique_ids to selected unit test unique_ids,
# for processing before the model.
def build_model_to_unit_test_map(self, selected_unit_tests):
dct = {}
for unit_test_unique_id in selected_unit_tests:
unit_test = self.manifest.unit_tests[unit_test_unique_id]
model_unique_id = unit_test.depends_on.nodes[0]
if model_unique_id not in dct:
dct[model_unique_id] = []
dct[model_unique_id].append(unit_test.unique_id)
self.model_to_unit_test_map = dct
# We return two different kinds of selectors, one with unit tests and one without
def get_node_selector(self, no_unit_tests=False) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise DbtInternalError("manifest and graph must be set to get node selection")
resource_types = self.resource_types(no_unit_tests)
if resource_types == [NodeType.Test]:
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=resource_types,
)
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=resource_types,
)
def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
return self.RUNNER_MAP.get(node.resource_type)
# Special build compile_manifest method to pass add_test_edges to the compiler
def compile_manifest(self) -> None:
if self.manifest is None:
raise DbtInternalError("compile_manifest called before manifest was loaded")
self.graph: Graph = self.compiler.compile(self.manifest, add_test_edges=True)