Skip to content

Commit

Permalink
repro: Check for hash mismatch bewteen deleted dependencies and upstr…
Browse files Browse the repository at this point in the history
…eam outputs.

Only if `--allow-missing` is passed.

- Create `tests/func/repro` and extract `pull` and `allow_missing` to separate test files.

Closes #9530
  • Loading branch information
daavoo authored and efiop committed Jun 13, 2023
1 parent 5ec4fa4 commit d0aa1ce
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 8 deletions.
3 changes: 2 additions & 1 deletion dvc/repo/reproduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def _reproduce_stages( # noqa: C901
steps = _get_steps(graph, stages, downstream, single_item)

force_downstream = kwargs.pop("force_downstream", False)
result = []
result: List["Stage"] = []
unchanged: List["Stage"] = []
# `ret` is used to add a cosmetic newline.
ret: List["Stage"] = []
Expand All @@ -173,6 +173,7 @@ def _reproduce_stages( # noqa: C901
logger.info("")

try:
kwargs["upstream"] = result + unchanged
ret = _reproduce_stage(stage, **kwargs)

if len(ret) == 0:
Expand Down
29 changes: 22 additions & 7 deletions dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,22 +294,33 @@ def env(self) -> Env:

return env

def changed_deps(self, allow_missing: bool = False) -> bool:
def changed_deps(
self, allow_missing: bool = False, upstream: Optional[List] = None
) -> bool:
if self.frozen:
return False

if self.is_callback or self.always_changed:
return True

return self._changed_deps(allow_missing=allow_missing)
return self._changed_deps(allow_missing=allow_missing, upstream=upstream)

@rwlocked(read=["deps"])
def _changed_deps(self, allow_missing: bool = False) -> bool:
def _changed_deps(
self, allow_missing: bool = False, upstream: Optional[List] = None
) -> bool:
for dep in self.deps:
status = dep.status()
if status:
if allow_missing and status[str(dep)] == "deleted":
continue
if upstream and any(
dep.fs_path == out.fs_path and dep.hash_info != out.hash_info
for stage in upstream
for out in stage.outs
):
status[str(dep)] = "modified"
else:
continue
logger.debug(
"Dependency '%s' of %s changed because it is '%s'.",
dep,
Expand Down Expand Up @@ -343,12 +354,14 @@ def changed_stage(self) -> bool:
return changed

@rwlocked(read=["deps", "outs"])
def changed(self, allow_missing: bool = False) -> bool:
def changed(
self, allow_missing: bool = False, upstream: Optional[List] = None
) -> bool:
is_changed = (
# Short-circuit order: stage md5 is fast,
# deps are expected to change
self.changed_stage()
or self.changed_deps(allow_missing=allow_missing)
or self.changed_deps(allow_missing=allow_missing, upstream=upstream)
or self.changed_outs(allow_missing=allow_missing)
)
if is_changed:
Expand Down Expand Up @@ -403,7 +416,9 @@ def transfer(
def reproduce(self, interactive=False, **kwargs) -> Optional["Stage"]:
if not (
kwargs.get("force", False)
or self.changed(kwargs.get("allow_missing", False))
or self.changed(
kwargs.get("allow_missing", False), kwargs.pop("upstream", None)
)
):
if not isinstance(self, PipelineStage) and self.is_data_source:
logger.info("'%s' didn't change, skipping", self.addressing)
Expand Down
Empty file added tests/func/repro/__init__.py
Empty file.
File renamed without changes.
57 changes: 57 additions & 0 deletions tests/func/repro/test_repro_allow_missing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from dvc.utils.fs import remove


def test_repro_allow_missing(tmp_dir, dvc):
tmp_dir.gen("fixed", "fixed")
dvc.stage.add(name="create-foo", cmd="echo foo > foo", deps=["fixed"], outs=["foo"])
dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"])
(create_foo, copy_foo) = dvc.reproduce()

remove("foo")
remove(create_foo.outs[0].cache_path)
remove(dvc.stage_cache.cache_dir)

ret = dvc.reproduce(allow_missing=True)
# both stages are skipped
assert not ret


def test_repro_allow_missing_and_pull(tmp_dir, dvc, mocker, local_remote):
tmp_dir.gen("fixed", "fixed")
dvc.stage.add(name="create-foo", cmd="echo foo > foo", deps=["fixed"], outs=["foo"])
dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"])
(create_foo,) = dvc.reproduce("create-foo")

dvc.push()

remove("foo")
remove(create_foo.outs[0].cache_path)
remove(dvc.stage_cache.cache_dir)

ret = dvc.reproduce(pull=True, allow_missing=True)
# create-foo is skipped ; copy-foo pulls missing dep
assert len(ret) == 1


def test_repro_allow_missing_upstream_stage_modified(
tmp_dir, dvc, mocker, local_remote
):
"""https://github.com/iterative/dvc/issues/9530"""
tmp_dir.gen("params.yaml", "param: 1")
dvc.stage.add(
name="create-foo", cmd="echo ${param} > foo", params=["param"], outs=["foo"]
)
dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"])
dvc.reproduce()

dvc.push()

tmp_dir.gen("params.yaml", "param: 2")
(create_foo,) = dvc.reproduce("create-foo")
dvc.push()
remove("foo")
remove(create_foo.outs[0].cache_path)

ret = dvc.reproduce(pull=True, allow_missing=True)
# create-foo is skipped ; copy-foo pulls modified dep
assert len(ret) == 1
58 changes: 58 additions & 0 deletions tests/func/repro/test_repro_pull.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import os

from dvc.stage.cache import RunCacheNotSupported
from dvc.utils.fs import remove


def test_repro_pulls_mising_data_source(tmp_dir, dvc, mocker, local_remote):
(foo,) = tmp_dir.dvc_gen("foo", "foo")

dvc.push()

dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"])
remove("foo")
remove(foo.outs[0].cache_path)

assert dvc.reproduce(pull=True)


def test_repro_pulls_mising_import(tmp_dir, dvc, mocker, erepo_dir, local_remote):
with erepo_dir.chdir():
erepo_dir.dvc_gen("foo", "foo", commit="first")

foo_import = dvc.imp(os.fspath(erepo_dir), "foo")

dvc.push()

dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"])
remove("foo")
remove(foo_import.outs[0].cache_path)

assert dvc.reproduce(pull=True)


def test_repro_pulls_continue_without_run_cache(tmp_dir, dvc, mocker, local_remote):
(foo,) = tmp_dir.dvc_gen("foo", "foo")

dvc.push()
mocker.patch.object(
dvc.stage_cache, "pull", side_effect=RunCacheNotSupported("foo")
)
dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"])
remove("foo")
remove(foo.outs[0].cache_path)

assert dvc.reproduce(pull=True)


def test_repro_skip_pull_if_no_run_cache_is_passed(tmp_dir, dvc, mocker, local_remote):
(foo,) = tmp_dir.dvc_gen("foo", "foo")

dvc.push()
spy_pull = mocker.spy(dvc.stage_cache, "pull")
dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"])
remove("foo")
remove(foo.outs[0].cache_path)

assert dvc.reproduce(pull=True, run_cache=False)
assert not spy_pull.called

0 comments on commit d0aa1ce

Please sign in to comment.