Skip to content

Commit 93d1b6d

Browse files
authored
Merge pull request #3217 from fabiosantoscode/feature/1744-pipeline-eliminate-extra-connection
pipeline: show: outs: eliminate extra edges in DAG
2 parents ed04a9f + 758205a commit 93d1b6d

File tree

2 files changed

+52
-27
lines changed

2 files changed

+52
-27
lines changed

dvc/command/pipeline.py

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,45 +31,50 @@ def _show(self, target, commands, outs, locked):
3131
else:
3232
logger.info(stage.path_in_repo)
3333

34-
def __build_graph(self, target, commands, outs):
34+
def _build_graph(self, target, commands, outs):
3535
import networkx
3636
from dvc.stage import Stage
3737
from dvc.repo.graph import get_pipeline
3838

39-
stage = Stage.load(self.repo, target)
40-
G = get_pipeline(self.repo.pipelines, stage)
39+
target_stage = Stage.load(self.repo, target)
40+
G = get_pipeline(self.repo.pipelines, target_stage)
4141

42-
nodes = []
43-
for stage in G:
42+
nodes = set()
43+
for stage in networkx.dfs_preorder_nodes(G, target_stage):
4444
if commands:
4545
if stage.cmd is None:
4646
continue
47-
nodes.append(stage.cmd)
47+
nodes.add(stage.cmd)
4848
elif outs:
4949
for out in stage.outs:
50-
nodes.append(str(out))
50+
nodes.add(str(out))
51+
for dep in stage.deps:
52+
nodes.add(str(dep))
5153
else:
52-
nodes.append(stage.relpath)
54+
nodes.add(stage.relpath)
5355

5456
edges = []
55-
for from_stage, to_stage in G.edges():
56-
if commands:
57-
if to_stage.cmd is None:
58-
continue
59-
edges.append((from_stage.cmd, to_stage.cmd))
60-
elif outs:
61-
for from_out in from_stage.outs:
62-
for to_out in to_stage.outs:
63-
edges.append((str(from_out), str(to_out)))
64-
else:
65-
edges.append((from_stage.relpath, to_stage.relpath))
6657

67-
return nodes, edges, networkx.is_tree(G)
58+
if outs:
59+
for stage in networkx.dfs_preorder_nodes(G, target_stage):
60+
for dep in stage.deps:
61+
for out in stage.outs:
62+
edges.append((str(out), str(dep)))
63+
else:
64+
for from_stage, to_stage in networkx.dfs_edges(G, target_stage):
65+
if commands:
66+
if to_stage.cmd is None:
67+
continue
68+
edges.append((from_stage.cmd, to_stage.cmd))
69+
else:
70+
edges.append((from_stage.relpath, to_stage.relpath))
71+
72+
return list(nodes), edges, networkx.is_tree(G)
6873

6974
def _show_ascii(self, target, commands, outs):
7075
from dvc.dagascii import draw
7176

72-
nodes, edges, _ = self.__build_graph(target, commands, outs)
77+
nodes, edges, _ = self._build_graph(target, commands, outs)
7378

7479
if not nodes:
7580
return
@@ -79,7 +84,7 @@ def _show_ascii(self, target, commands, outs):
7984
def _show_dependencies_tree(self, target, commands, outs):
8085
from treelib import Tree
8186

82-
nodes, edges, is_tree = self.__build_graph(target, commands, outs)
87+
nodes, edges, is_tree = self._build_graph(target, commands, outs)
8388
if not nodes:
8489
return
8590
if not is_tree:
@@ -100,12 +105,12 @@ def _show_dependencies_tree(self, target, commands, outs):
100105
observe_list.pop(0)
101106
tree.show()
102107

103-
def __write_dot(self, target, commands, outs):
108+
def _write_dot(self, target, commands, outs):
104109
import io
105110
import networkx
106111
from networkx.drawing.nx_pydot import write_dot
107112

108-
_, edges, _ = self.__build_graph(target, commands, outs)
113+
_, edges, _ = self._build_graph(target, commands, outs)
109114
edges = [edge[::-1] for edge in edges]
110115

111116
simple_g = networkx.DiGraph()
@@ -126,9 +131,7 @@ def run(self):
126131
target, self.args.commands, self.args.outs
127132
)
128133
elif self.args.dot:
129-
self.__write_dot(
130-
target, self.args.commands, self.args.outs
131-
)
134+
self._write_dot(target, self.args.commands, self.args.outs)
132135
elif self.args.tree:
133136
self._show_dependencies_tree(
134137
target, self.args.commands, self.args.outs

tests/func/test_pipeline.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22

33
from dvc.main import main
4+
from dvc.command.pipeline import CmdPipelineShow
45
from tests.basic_env import TestDvc
56
from tests.func.test_repro import TestRepro
67
from tests.func.test_repro import TestReproChangedDeepData
@@ -98,6 +99,27 @@ def test_dot_commands(self):
9899
self.assertEqual(ret, 0)
99100

100101

102+
def test_disconnected_stage(tmp_dir, dvc):
103+
tmp_dir.dvc_gen({"base": "base"})
104+
105+
dvc.add("base")
106+
dvc.run(deps=["base"], outs=["derived1"], cmd="echo derived1 > derived1")
107+
dvc.run(deps=["base"], outs=["derived2"], cmd="echo derived2 > derived2")
108+
final_stage = dvc.run(
109+
deps=["derived1"], outs=["final"], cmd="echo final > final"
110+
)
111+
112+
command = CmdPipelineShow([])
113+
# Need to test __build_graph directly
114+
nodes, edges, is_tree = command._build_graph(
115+
final_stage.path, commands=False, outs=True
116+
)
117+
118+
assert set(nodes) == {"final", "derived1", "base"}
119+
assert edges == [("final", "derived1"), ("derived1", "base")]
120+
assert is_tree is True
121+
122+
101123
def test_print_locked_stages(tmp_dir, dvc, caplog):
102124
tmp_dir.dvc_gen({"foo": "foo content", "bar": "bar content"})
103125
dvc.lock_stage("foo.dvc")

0 commit comments

Comments
 (0)