-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathgraph.py
172 lines (142 loc) · 6.59 KB
/
graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from functools import partial
from itertools import product
from typing import Iterable, Iterator, NewType, Optional, Set
import networkx as nx # type: ignore
from dbt_common.exceptions import DbtInternalError
UniqueId = NewType("UniqueId", str)
class Graph:
"""A wrapper around the networkx graph that understands SelectionCriteria
and how they interact with the graph.
"""
def __init__(self, graph) -> None:
self.graph: nx.DiGraph = graph
def nodes(self) -> Set[UniqueId]:
return set(self.graph.nodes())
def edges(self):
return self.graph.edges()
def __iter__(self) -> Iterator[UniqueId]:
return iter(self.graph.nodes())
def ancestors(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[UniqueId]:
"""Returns all nodes having a path to `node` in `graph`"""
if not self.graph.has_node(node):
raise DbtInternalError(f"Node {node} not found in the graph!")
filtered_graph = self.exclude_edge_type("parent_test")
return {
child
for _, child in nx.bfs_edges(filtered_graph, node, reverse=True, depth_limit=max_depth)
}
def descendants(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[UniqueId]:
"""Returns all nodes reachable from `node` in `graph`"""
if not self.graph.has_node(node):
raise DbtInternalError(f"Node {node} not found in the graph!")
filtered_graph = self.exclude_edge_type("parent_test")
return {child for _, child in nx.bfs_edges(filtered_graph, node, depth_limit=max_depth)}
def exclude_edge_type(self, edge_type_to_exclude):
return nx.subgraph_view(
self.graph,
filter_edge=partial(self.filter_edges_by_type, edge_type=edge_type_to_exclude),
)
def filter_edges_by_type(self, first_node, second_node, edge_type):
return self.graph.get_edge_data(first_node, second_node).get("edge_type") != edge_type
def select_childrens_parents(self, selected: Set[UniqueId]) -> Set[UniqueId]:
ancestors_for = self.select_children(selected) | selected
return self.select_parents(ancestors_for) | ancestors_for
def select_children(
self, selected: Set[UniqueId], max_depth: Optional[int] = None
) -> Set[UniqueId]:
"""Returns all nodes which are descendants of the 'selected' set.
Nodes in the 'selected' set are counted as children only if
they are descendants of other nodes in the 'selected' set."""
children: Set[UniqueId] = set()
i = 0
while len(selected) > 0 and (max_depth is None or i < max_depth):
next_layer: Set[UniqueId] = set()
for node in selected:
next_layer.update(
iter(
e[1]
for e in self.graph.out_edges(node)
if e[1] not in children
and self.filter_edges_by_type(e[0], e[1], "parent_test")
)
)
children.update(next_layer)
selected = next_layer
i += 1
return children
def select_parents(
self, selected: Set[UniqueId], max_depth: Optional[int] = None
) -> Set[UniqueId]:
"""Returns all nodes which are ancestors of the 'selected' set.
Nodes in the 'selected' set are counted as parents only if
they are ancestors of other nodes in the 'selected' set."""
parents: Set[UniqueId] = set()
i = 0
while len(selected) > 0 and (max_depth is None or i < max_depth):
next_layer: Set[UniqueId] = set()
for node in selected:
next_layer.update(
iter(
e[0]
for e in self.graph.in_edges(node)
if e[0] not in parents
and self.filter_edges_by_type(e[0], e[1], "parent_test")
)
)
parents.update(next_layer)
selected = next_layer
i += 1
return parents
def select_successors(self, selected: Set[UniqueId]) -> Set[UniqueId]:
successors: Set[UniqueId] = set()
for node in selected:
successors.update(self.graph.successors(node))
return successors
def get_subset_graph(self, selected: Iterable[UniqueId]) -> "Graph":
"""Create and return a new graph that is a shallow copy of the graph,
but with only the nodes in include_nodes. Transitive edges across
removed nodes are preserved as explicit new edges.
"""
new_graph: nx.DiGraph = self.graph.copy()
include_nodes: Set[UniqueId] = set(selected)
still_removing: bool = True
while still_removing:
nodes_to_remove = list(
node
for node in new_graph
if node not in include_nodes
and (new_graph.in_degree(node) * new_graph.out_degree(node)) == 0
)
if len(nodes_to_remove) == 0:
still_removing = False
else:
new_graph.remove_nodes_from(nodes_to_remove)
# sort remaining nodes by degree
remaining_nodes = list(new_graph.nodes())
remaining_nodes.sort(
key=lambda node: new_graph.in_degree(node) * new_graph.out_degree(node)
)
for node in remaining_nodes:
if node not in include_nodes:
source_nodes = [x for x, _ in new_graph.in_edges(node)]
target_nodes = [x for _, x in new_graph.out_edges(node)]
new_edges = product(source_nodes, target_nodes)
non_cyclic_new_edges = [
(source, target)
for source, target in new_edges
if source != target and not new_graph.has_edge(source, target)
] # removes cyclic refs and edges already existing in new graph
new_graph.add_edges_from(non_cyclic_new_edges)
new_graph.remove_node(node)
for node in include_nodes:
if node not in new_graph:
raise ValueError(
"Couldn't find model '{}' -- does it exist or is it disabled?".format(node)
)
return Graph(new_graph)
def subgraph(self, nodes: Iterable[UniqueId]) -> "Graph":
# Take the original networkx graph and return a subgraph containing only
# the selected unique_id nodes.
return Graph(self.graph.subgraph(nodes))
def get_dependent_nodes(self, node: UniqueId):
return nx.descendants(self.graph, node)