Skip to content

Commit

Permalink
bugfix unchanged stages
Browse files Browse the repository at this point in the history
  • Loading branch information
PythonFZ committed Dec 20, 2024
1 parent 0781eea commit 20832eb
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
2 changes: 1 addition & 1 deletion paraffin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def submit(
)
if show_mermaid:
log.debug("Visualizing graph")
typer.echo(levels_to_mermaid(disconnected_levels))
typer.echo(levels_to_mermaid(disconnected_levels, changed_stages=changed_stages))

typer.echo(f"Submitted all (n = {len(graph)}) tasks.")
typer.echo(
Expand Down
23 changes: 20 additions & 3 deletions paraffin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import git
import networkx as nx
import yaml
import json

from paraffin.abc import HirachicalStages, StageContainer

Expand Down Expand Up @@ -82,7 +83,20 @@ def get_changed_stages(subgraph) -> list:
fs = dvc.api.DVCFileSystem(url=None, rev=None)
repo = fs.repo
names = [x.name for x in subgraph.nodes]
return list(repo.status(targets=names))
changed = list(repo.status(targets=names))
graph = fs.repo.index.graph.reverse(copy=True)
# find all downstream stages and add them to the changed list
# Issue with changed stages is, if any upstream stage was changed
# then we need to run ALL downstream stages, because
# dvc status does not know / tell us because the immediate
# upstream stage was unchanged at the point of checking.

for name in changed:
stage = next(x for x in graph.nodes if hasattr(x, "name") and x.name == name)
for node in nx.descendants(graph, stage):
changed.append(node.name)
# TODO: split into definetly changed and maybe changed stages
return changed


def get_custom_queue():
Expand Down Expand Up @@ -160,7 +174,7 @@ def dag_to_levels(
return levels


def levels_to_mermaid(all_levels: list[HirachicalStages]) -> str:
def levels_to_mermaid(all_levels: list[HirachicalStages], changed_stages: list[str]) -> str:
# Initialize Mermaid syntax
mermaid_syntax = "flowchart TD\n"

Expand All @@ -169,7 +183,10 @@ def levels_to_mermaid(all_levels: list[HirachicalStages]) -> str:
for level, nodes in levels.items():
mermaid_syntax += f"\tsubgraph Level{idx}:{level + 1}\n"
for node in nodes:
mermaid_syntax += f"\t\t{node.name}\n"
if node.name in changed_stages:
mermaid_syntax += f"\t\t{node.name}\n"
else:
mermaid_syntax += f"\t\t{node.name}(✓)\n"
mermaid_syntax += "\tend\n"

# Add connections between levels
Expand Down
6 changes: 5 additions & 1 deletion tests/test_run_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ def check_finished(names: list[str] | None = None, exclusive: bool = False) -> b
for name in names or []:
cmd.append(name)
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout.decode().strip() == "Data and pipelines are up to date."
finished = result.stdout.decode().strip() == "Data and pipelines are up to date."
if not finished:
print(result.stdout.decode())
return finished


def test_check_finished(proj01):
Expand Down Expand Up @@ -149,6 +152,7 @@ def test_run_datafile(proj02, caplog):
data_file.write_text("4,5,6")

result = runner.invoke(app, ["submit", "--glob", "a*"])
print(result.stdout)
# assert "Running 2 stages" in caplog.text
# caplog.clear()
assert result.exit_code == 0
Expand Down

0 comments on commit 20832eb

Please sign in to comment.