Skip to content

Commit

Permalink
Refactor for node selection
Browse files Browse the repository at this point in the history
 - moved what remains of the linker into compilation, migrate things to the graph
 - compile uncompiled ephemeral models when prepending ctes
  - no more including them all in the selection
 - per-node task selectors
 - added the idea of a selection spec, graph selection uses that
   - right now we parse include + exclude strings


automatic commit by git-black, original commits:
  89443ea
  • Loading branch information
Jacob Beck authored and iknox-fa committed Feb 8, 2022
1 parent e92b80b commit c6d8110
Show file tree
Hide file tree
Showing 15 changed files with 57 additions and 83 deletions.
2 changes: 1 addition & 1 deletion core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
def _compiled_type_for(model: ParsedNode):
if type(model) not in COMPILED_TYPES:
raise InternalException(
f'Asked to compile {type(model)} node, but it has no compiled form'
f"Asked to compile {type(model)} node, but it has no compiled form"
)
return COMPILED_TYPES[type(model)]

Expand Down
6 changes: 2 additions & 4 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,7 @@ def __post_deserialize__(cls, obj):
obj._lock = flags.MP_CONTEXT.Lock()
return obj

def sync_update_node(
self, new_node: NonSourceCompiledNode
) -> NonSourceCompiledNode:
def sync_update_node(self, new_node: NonSourceCompiledNode) -> NonSourceCompiledNode:
"""update the node with a lock. The only time we should want to lock is
when compiling an ephemeral ancestor of a node at runtime, because
multiple threads could be just-in-time compiling the same ephemeral
Expand All @@ -623,7 +621,7 @@ def sync_update_node(
"""
with self._lock:
existing = self.nodes[new_node.unique_id]
if getattr(existing, 'compiled', False):
if getattr(existing, "compiled", False):
# already compiled -> must be a NonSourceCompiledNode
return cast(NonSourceCompiledNode, existing)
_update_into(self.nodes, new_node)
Expand Down
18 changes: 9 additions & 9 deletions core/dbt/graph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
IndirectSelection
)

INTERSECTION_DELIMITER = ','
INTERSECTION_DELIMITER = ","

DEFAULT_INCLUDES: List[str] = ['fqn:*', 'source:*', 'exposure:*', 'metric:*']
DEFAULT_EXCLUDES: List[str] = []
Expand All @@ -30,9 +30,7 @@ def parse_union(
indirect_selection: IndirectSelection = IndirectSelection.Eager
) -> SelectionUnion:
# turn ['a b', 'c'] -> ['a', 'b', 'c']
raw_specs = itertools.chain.from_iterable(
r.split(' ') for r in components
)
raw_specs = itertools.chain.from_iterable(r.split(" ") for r in components)
union_components: List[SelectionSpec] = []

# ['a', 'b', 'c,d'] -> union('a', 'b', intersection('c', 'd'))
Expand All @@ -41,11 +39,13 @@ def parse_union(
SelectionCriteria.from_single_spec(part, indirect_selection=indirect_selection)
for part in raw_spec.split(INTERSECTION_DELIMITER)
]
union_components.append(SelectionIntersection(
components=intersection_components,
expect_exists=expect_exists,
raw=raw_spec,
))
union_components.append(
SelectionIntersection(
components=intersection_components,
expect_exists=expect_exists,
raw=raw_spec,
)
)
return SelectionUnion(
components=union_components,
expect_exists=False,
Expand Down
21 changes: 9 additions & 12 deletions core/dbt/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Graph:
"""A wrapper around the networkx graph that understands SelectionCriteria
and how they interact with the graph.
"""

def __init__(self, graph):
self.graph = graph

Expand All @@ -30,7 +31,7 @@ def ancestors(
) -> Set[UniqueId]:
"""Returns all nodes having a path to `node` in `graph`"""
if not self.graph.has_node(node):
raise InternalException(f'Node {node} not found in the graph!')
raise InternalException(f"Node {node} not found in the graph!")
# This used to use nx.utils.reversed(self.graph), but that is deprecated,
# so changing to use self.graph.reverse(copy=False) as recommeneded
G = self.graph.reverse(copy=False) if self.graph.is_directed() else self.graph
Expand All @@ -45,16 +46,13 @@ def descendants(
) -> Set[UniqueId]:
"""Returns all nodes reachable from `node` in `graph`"""
if not self.graph.has_node(node):
raise InternalException(f'Node {node} not found in the graph!')
des = nx.single_source_shortest_path_length(G=self.graph,
source=node,
cutoff=max_depth)\
.keys()
raise InternalException(f"Node {node} not found in the graph!")
des = nx.single_source_shortest_path_length(
G=self.graph, source=node, cutoff=max_depth
).keys()
return des - {node}

def select_childrens_parents(
self, selected: Set[UniqueId]
) -> Set[UniqueId]:
def select_childrens_parents(self, selected: Set[UniqueId]) -> Set[UniqueId]:
ancestors_for = self.select_children(selected) | selected
return self.select_parents(ancestors_for) | ancestors_for

Expand Down Expand Up @@ -105,13 +103,12 @@ def get_subset_graph(self, selected: Iterable[UniqueId]) -> "Graph":
for node in include_nodes:
if node not in new_graph:
raise ValueError(
"Couldn't find model '{}' -- does it exist or is "
"it disabled?".format(node)
"Couldn't find model '{}' -- does it exist or is " "it disabled?".format(node)
)

return Graph(new_graph)

def subgraph(self, nodes: Iterable[UniqueId]) -> 'Graph':
def subgraph(self, nodes: Iterable[UniqueId]) -> "Graph":
return Graph(self.graph.subgraph(nodes))

def get_dependent_nodes(self, node: UniqueId):
Expand Down
16 changes: 7 additions & 9 deletions core/dbt/graph/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def can_select_indirectly(node):


class NodeSelector(MethodManager):
"""The node selector is aware of the graph and manifest,
"""The node selector is aware of the graph and manifest,"""
"""
def __init__(
self,
Expand Down Expand Up @@ -200,9 +200,7 @@ def filter_selection(self, selected: Set[UniqueId]) -> Set[UniqueId]:
"""Return the subset of selected nodes that is a match for this
selector.
"""
return {
unique_id for unique_id in selected if self._is_match(unique_id)
}
return {unique_id for unique_id in selected if self._is_match(unique_id)}
def expand_selection(
self, selected: Set[UniqueId],
Expand Down Expand Up @@ -263,12 +261,12 @@ def incorporate_indirect_nodes(
def get_selected(self, spec: SelectionSpec) -> Set[UniqueId]:
"""get_selected runs through the node selection process:

- node selection. Based on the include/exclude sets, the set
of matched unique IDs is returned
- node selection. Based on the include/exclude sets, the set
of matched unique IDs is returned
- includes direct + indirect selection (for tests)
- filtering:
- selectors can filter the nodes after all of them have been
selected
- filtering:
- selectors can filter the nodes after all of them have been
selected
"""
selected_nodes, indirect_only = self.select_nodes(spec)
filtered_nodes = self.filter_selection(selected_nodes)
Expand Down
14 changes: 7 additions & 7 deletions core/dbt/graph/selector_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
from dbt.node_types import NodeType


SELECTOR_GLOB = '*'
SELECTOR_DELIMITER = ':'
SELECTOR_GLOB = "*"
SELECTOR_DELIMITER = ":"


class MethodName(StrEnum):
Expand Down Expand Up @@ -162,7 +162,7 @@ def search(
included_nodes: Set[UniqueId],
selector: str,
) -> Iterator[UniqueId]:
raise NotImplementedError('subclasses should implement this')
raise NotImplementedError("subclasses should implement this")


class QualifiedNameSelectorMethod(SelectorMethod):
Expand Down Expand Up @@ -211,7 +211,7 @@ def search(
self, included_nodes: Set[UniqueId], selector: str
) -> Iterator[UniqueId]:
"""yields nodes from included are the specified source."""
parts = selector.split('.')
parts = selector.split(".")
target_package = SELECTOR_GLOB
if len(parts) == 1:
target_source, target_table = parts[0], None
Expand All @@ -222,9 +222,9 @@ def search(
else: # len(parts) > 3 or len(parts) == 0
msg = (
'Invalid source selector value "{}". Sources must be of the '
'form `${{source_name}}`, '
'`${{source_name}}.${{target_name}}`, or '
'`${{package_name}}.${{source_name}}.${{target_name}}'
"form `${{source_name}}`, "
"`${{source_name}}.${{target_name}}`, or "
"`${{package_name}}.${{source_name}}.${{target_name}}"
).format(selector)
raise RuntimeException(msg)

Expand Down
33 changes: 14 additions & 19 deletions core/dbt/graph/selector_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@


RAW_SELECTOR_PATTERN = re.compile(
r'\A'
r"\A"
r'(?P<childrens_parents>(\@))?'
r'(?P<parents>((?P<parents_depth>(\d*))\+))?'
r"(?P<parents>((?P<parents_depth>(\d*))\+))?"
r'((?P<method>([\w.]+)):)?(?P<value>(.*?))'
r'(?P<children>(\+(?P<children_depth>(\d*))))?'
r'\Z'
r"(?P<children>(\+(?P<children_depth>(\d*))))?"
r"\Z"
)
SELECTOR_METHOD_SEPARATOR = '.'

Expand Down Expand Up @@ -48,16 +48,14 @@ def _match_to_int(match: Dict[str, str], key: str) -> Optional[int]:
try:
return int(raw)
except ValueError as exc:
raise RuntimeException(
f'Invalid node spec - could not handle parent depth {raw}'
) from exc
raise RuntimeException(f"Invalid node spec - could not handle parent depth {raw}") from exc


SelectionSpec = Union[
'SelectionCriteria',
'SelectionIntersection',
'SelectionDifference',
'SelectionUnion',
"SelectionCriteria",
"SelectionIntersection",
"SelectionDifference",
"SelectionUnion",
]


Expand All @@ -77,8 +75,7 @@ class SelectionCriteria:
def __post_init__(self):
if self.children and self.childrens_parents:
raise RuntimeException(
f'Invalid node spec {self.raw} - "@" prefix and "+" suffix '
'are incompatible'
f'Invalid node spec {self.raw} - "@" prefix and "+" suffix ' "are incompatible"
)

@classmethod
Expand Down Expand Up @@ -114,9 +111,9 @@ def selection_criteria_from_dict(
indirect_selection: IndirectSelection = IndirectSelection.Eager
dct: Dict[str, Any],
if 'value' not in dct:
raise RuntimeException(
f'Invalid node spec "{raw}" - no search value!'
)
) -> "SelectionCriteria":
if "value" not in dct:
raise RuntimeException(f'Invalid node spec "{raw}" - no search value!')
method_name, method_arguments = cls.parse_method(dct)

parents_depth = _match_to_int(dct, 'parents_depth')
Expand Down Expand Up @@ -197,9 +194,7 @@ def combine_selections(
self,
selections: List[Set[UniqueId]],
) -> Set[UniqueId]:
raise NotImplementedError(
'_combine_selections not implemented!'
)
raise NotImplementedError("_combine_selections not implemented!")

def combined(self, selections: List[Set[UniqueId]]) -> Set[UniqueId]:
if not selections:
Expand Down
4 changes: 1 addition & 3 deletions core/dbt/task/compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ def raise_on_first_error(self):

def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise InternalException(
'manifest and graph must be set to get perform node selection'
)
raise InternalException("manifest and graph must be set to get perform node selection")
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
Expand Down
4 changes: 1 addition & 3 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,7 @@ def raise_on_first_error(self):

def get_node_selector(self):
if self.manifest is None or self.graph is None:
raise InternalException(
'manifest and graph must be set to get perform node selection'
)
raise InternalException("manifest and graph must be set to get perform node selection")
return FreshnessSelector(
graph=self.graph,
manifest=self.manifest,
Expand Down
4 changes: 1 addition & 3 deletions core/dbt/task/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,7 @@ def selection_arg(self):

def get_node_selector(self):
if self.manifest is None or self.graph is None:
raise InternalException(
'manifest and graph must be set to get perform node selection'
)
raise InternalException("manifest and graph must be set to get perform node selection")
if self.resource_types == [NodeType.Test]:
return TestSelector(
graph=self.graph,
Expand Down
4 changes: 1 addition & 3 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,7 @@ def after_hooks(self, adapter, results, elapsed):

def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise InternalException(
'manifest and graph must be set to get perform node selection'
)
raise InternalException("manifest and graph must be set to get perform node selection")
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def execute_nodes(self):

def _mark_dependent_errors(self, node_id, result, cause):
if self.graph is None:
raise InternalException('graph is None in _mark_dependent_errors')
raise InternalException("graph is None in _mark_dependent_errors")
for dep_node_id in self.graph.get_dependent_nodes(node_id):
self._skipped_children[dep_node_id] = cause

Expand Down
4 changes: 1 addition & 3 deletions core/dbt/task/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ def raise_on_first_error(self):

def get_node_selector(self):
if self.manifest is None or self.graph is None:
raise InternalException(
'manifest and graph must be set to get perform node selection'
)
raise InternalException("manifest and graph must be set to get perform node selection")
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
Expand Down
4 changes: 1 addition & 3 deletions core/dbt/task/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ def raise_on_first_error(self):

def get_node_selector(self):
if self.manifest is None or self.graph is None:
raise InternalException(
'manifest and graph must be set to get perform node selection'
)
raise InternalException("manifest and graph must be set to get perform node selection")
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
Expand Down
4 changes: 1 addition & 3 deletions core/dbt/task/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,7 @@ def raise_on_first_error(self):

def get_node_selector(self) -> TestSelector:
if self.manifest is None or self.graph is None:
raise InternalException(
'manifest and graph must be set to get perform node selection'
)
raise InternalException("manifest and graph must be set to get perform node selection")
return TestSelector(
graph=self.graph,
manifest=self.manifest,
Expand Down

0 comments on commit c6d8110

Please sign in to comment.