Skip to content

Commit

Permalink
push/pull: properly collect run cache
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed May 8, 2020
1 parent 2a16b02 commit c121c33
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 25 deletions.
20 changes: 2 additions & 18 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ def _init_remote(self, name):
return Remote(self.repo, name=name)

def push(
self,
cache,
jobs=None,
remote=None,
show_checksums=False,
run_cache=False,
self, cache, jobs=None, remote=None, show_checksums=False,
):
"""Push data items in a cloud-agnostic way.
Expand All @@ -67,20 +62,12 @@ def push(
"""
remote = self.get_remote(remote, "push")

if run_cache:
self.repo.stage_cache.push(remote)

return self.repo.cache.local.push(
cache, jobs=jobs, remote=remote, show_checksums=show_checksums,
)

def pull(
self,
cache,
jobs=None,
remote=None,
show_checksums=False,
run_cache=False,
self, cache, jobs=None, remote=None, show_checksums=False,
):
"""Pull data items in a cloud-agnostic way.
Expand All @@ -94,9 +81,6 @@ def pull(
"""
remote = self.get_remote(remote, "pull")

if run_cache:
self.repo.stage_cache.pull(remote)

downloaded_items_num = self.repo.cache.local.pull(
cache, jobs=jobs, remote=remote, show_checksums=show_checksums
)
Expand Down
7 changes: 7 additions & 0 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def used_cache(
force=False,
jobs=None,
recursive=False,
used_run_cache=None,
):
"""Get the stages related to the given target and collect
the `info` of its outputs.
Expand Down Expand Up @@ -291,6 +292,12 @@ def used_cache(
)
cache.update(used_cache, suffix=suffix)

if used_run_cache:
used_cache = self.stage_cache.get_used_cache(
used_run_cache, remote=remote, force=force, jobs=jobs,
)
cache.update(used_cache)

return cache

def _collect_graph(self, stages):
Expand Down
10 changes: 5 additions & 5 deletions dvc/repo/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def _fetch(
config.NoRemoteError: thrown when downloading only local files and no
remote is configured
"""

used_run_cache = self.stage_cache.pull(remote) if run_cache else []

used = self.used_cache(
targets,
all_branches=all_branches,
Expand All @@ -44,18 +47,15 @@ def _fetch(
remote=remote,
jobs=jobs,
recursive=recursive,
used_run_cache=used_run_cache,
)

downloaded = 0
failed = 0

try:
downloaded += self.cloud.pull(
used,
jobs,
remote=remote,
show_checksums=show_checksums,
run_cache=run_cache,
used, jobs, remote=remote, show_checksums=show_checksums,
)
except NoRemoteError:
if not used.external and used["local"]:
Expand Down
5 changes: 4 additions & 1 deletion dvc/repo/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def push(
all_commits=False,
run_cache=False,
):
used_run_cache = self.stage_cache.push(remote) if run_cache else []

used = self.used_cache(
targets,
all_branches=all_branches,
Expand All @@ -24,6 +26,7 @@ def push(
remote=remote,
jobs=jobs,
recursive=recursive,
used_run_cache=used_run_cache,
)

return self.cloud.push(used, jobs, remote=remote, run_cache=run_cache)
return self.cloud.push(used, jobs, remote=remote)
31 changes: 30 additions & 1 deletion dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ def restore(self, stage):

@staticmethod
def _transfer(func, from_remote, to_remote):
ret = []

runs = from_remote.path_info / "runs"
if not from_remote.exists(runs):
return
return []

for src in from_remote.walk_files(runs):
rel = src.relative_to(from_remote.path_info)
Expand All @@ -118,9 +120,36 @@ def _transfer(func, from_remote, to_remote):
if to_remote.exists(key) and first(to_remote.walk_files(key)):
continue
func(src, dst)
ret.append((src.parent.name, src.name))

return ret

def push(self, remote):
remote = self.repo.cloud.get_remote(remote)
return self._transfer(remote.upload, self.repo.cache.local, remote)

def pull(self, remote):
remote = self.repo.cloud.get_remote(remote)
return self._transfer(remote.download, remote, self.repo.cache.local)

def get_used_cache(self, used_run_cache, *args, **kwargs):
from dvc.cache import NamedCache
from dvc.stage import create_stage, PipelineStage

cache = NamedCache()

for key, value in used_run_cache:
entry = self._load_cache(key, value)
if not entry:
continue
stage = create_stage(
PipelineStage,
repo=self.repo,
path="dvc.yaml",
cmd=entry["cmd"],
deps=[dep["path"] for dep in entry["deps"]],
outs=[out["path"] for out in entry["outs"]],
)
StageLoader.fill_from_lock(stage, entry)
cache.update(stage.get_used_cache(*args, **kwargs))
return cache

0 comments on commit c121c33

Please sign in to comment.