Skip to content
/ dvc Public
forked from iterative/dvc
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] index worktree abstractions #754

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dvc/objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ def load(odb, hash_info):
if hash_info.isdir:
return Tree.load(odb, hash_info)
return odb.get(hash_info)


def project(odb, hash_info, fs, path_info, strategy=None):
from dvc.checkout import checkout

obj = load(odb, hash_info)
return checkout(path_info, fs, hash_info, obj, odb, relink=True)
12 changes: 12 additions & 0 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ def scheme(self):
def is_in_repo(self):
return False

@property
def is_decorated(self) -> bool:
return self.is_metric or self.is_plot

@property
def is_metric(self) -> bool:
return bool(self.metric) or bool(self.live)

@property
def is_plot(self) -> bool:
return bool(self.plot)

@property
def use_scm_ignore(self):
if not self.is_in_repo:
Expand Down
198 changes: 198 additions & 0 deletions dvc/repo/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
import os
from functools import cached_property
from typing import Iterator

from funcy import cat

from dvc.dependency import ParamsDependency
from dvc.exceptions import OutputNotFoundError
from dvc.fs import LocalFileSystem
from dvc.fs.base import BaseFileSystem
from dvc.output import BaseOutput
from dvc.path_info import PathInfo
from dvc.repo import Repo
from dvc.utils.fs import path_isin


class Index:
def __init__(self, repo: "Repo", fs: "BaseFileSystem") -> None:
# not a bug, index is very much tied to the fs, not to the repo
self.fs = fs
self.repo: "Repo" = repo

@property
def outputs(self) -> Iterator["BaseOutput"]:
for stage in self:
yield from stage.outs

@property
def decorated_outputs(self) -> Iterator["BaseOutput"]:
for output in self.outputs:
if output.is_decorated:
yield output

@property
def metrics(self) -> Iterator["BaseOutput"]:
for output in self.outputs:
if output.is_metric:
yield output

@property
def plots(self) -> Iterator["BaseOutput"]:
for output in self.outputs:
if output.is_plot:
yield output

@property
def dependencies(self) -> Iterator["BaseOutput"]:
for stage in self:
yield from stage.dependencies

@property
def params(self) -> Iterator["ParamsDependency"]:
for dep in self.dependencies:
if isinstance(dep, ParamsDependency):
yield dep

@cached_property
def stages(self):
"""
Walks down the root directory looking for Dvcfiles,
skipping the directories that are related with
any SCM (e.g. `.git`), DVC itself (`.dvc`), or directories
tracked by DVC (e.g. `dvc add data` would skip `data/`)

NOTE: For large repos, this could be an expensive
operation. Consider using some memoization.
"""
error_handler = self.repo.stage_collection_error_handler
return self.repo.stage.collect_repo(onerror=error_handler)

@cached_property
def outs_trie(self):
from .trie import build_outs_trie

return build_outs_trie(self.stages)

@cached_property
def graph(self):
from .graph import build_graph

return build_graph(self.stages, self.outs_trie)

@cached_property
def outs_graph(self):
from .graph import build_outs_graph

return build_outs_graph(self.graph, self.outs_trie)

@cached_property
def pipelines(self):
from .graph import get_pipelines

return get_pipelines(self.graph)

def used_cache(
self,
targets=None,
all_branches=False,
with_deps=False,
all_tags=False,
all_commits=False,
all_experiments=False,
remote=None,
force=False,
jobs=None,
recursive=False,
used_run_cache=None,
revs=None,
):
"""Get the stages related to the given target and collect
the `info` of its outputs.

This is useful to know what files from the cache are _in use_
(namely, a file described as an output on a stage).

The scope is, by default, the working directory, but you can use
`all_branches`/`all_tags`/`all_commits`/`all_experiments` to expand
the scope.

Returns:
A dictionary with Schemes (representing output's location) mapped
to items containing the output's `dumpd` names and the output's
children (if the given output is a directory).
"""
from dvc.objects.db import NamedCache

cache = NamedCache()

targets = targets or [None]

pairs = cat(
self.repo.stage.collect_granular(
target, recursive=recursive, with_deps=with_deps
)
for target in targets
)

rev = (
self.scm.get_rev()
if isinstance(self.fs, LocalFileSystem)
else self.fs.rev
)
suffix = f"({rev})" if rev else ""
for stage, filter_info in pairs:
used_cache = stage.get_used_cache(
remote=remote, force=force, jobs=jobs, filter_info=filter_info,
)
cache.update(used_cache, suffix=suffix)

return cache

def find_outs_by_path(self, path, outs=None, recursive=False, strict=True):
# using `outs_graph` to ensure graph checks are run
outs = outs or self.outs_graph

abs_path = os.path.abspath(path)
path_info = PathInfo(abs_path)
match = path_info.__eq__ if strict else path_info.isin_or_eq

def func(out):
if out.scheme == "local" and match(out.path_info):
return True

if recursive and out.path_info.isin(path_info):
return True

return False

matched = list(filter(func, outs))
if not matched:
raise OutputNotFoundError(path, self)

return matched

def _reset(self):
# we don't need to reset these for the indexes that are not
# currently checked out.
self.__dict__.pop("outs_trie", None)
self.__dict__.pop("outs_graph", None)
self.__dict__.pop("graph", None)
self.__dict__.pop("stages", None)
self.__dict__.pop("pipelines", None)

def filter_stages(self, path):
for stage in self:
if path_isin(stage.path_in_repo, path):
yield stage

def slice(self, target_path):
new = Index(self.repo, self.fs)
new.stages = self.filter_stages(target_path)
return new

def add_artifact(self, path_info, obj, **kwargs):
pass

def __iter__(self):
yield from self.stages
80 changes: 80 additions & 0 deletions dvc/repo/worktree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from dvc.fs.base import BaseFileSystem
from dvc.objects.db.base import ObjectDB

from .index import Index


class WorkTree:
"""
Worktree - it's not fs, but it connects the worktree with the cache/odb.

So, it supports moving something from:

(using index ->)
─────────────────────────────>
cache index worktree
<─────────────────────────────
(updating index <-)
"""

def __init__(self, odb: "ObjectDB", fs: "BaseFileSystem", index: "Index"):
self.odb = odb
self.fs = fs
self.index = index

def commit(self, target: str):
from dvc.objects import project, save
from dvc.objects.stage import stage

path_info = self.fs.path_info / target
if not self.fs.exists(path_info):
raise Exception("does not exist")

obj = stage(self.odb, path_info, self.fs, self.odb.fs.PARAM_CHECKSUM)
save(self.odb, obj)
project(self.odb, obj, self.fs, path_info, strategy="copy")

stage = self.index.add_artifact(path_info, obj)
return stage

add = commit

def checkout(self, target):
index = self.index.slice(target)
return self.checkout_from_index(index)

def checkout_from_index(self, index: Index = None):
from dvc.objects import project

index = index or self.index
hash_infos = {o.path_info: o.hash_info for o in index.outputs}
for path_info, hash_info in hash_infos.items():
project(self.odb, self.odb.get(hash_info), self.fs, path_info)

def remove(self, target: str, remove_outs: bool = False):
index = self.index.slice(target)
return self.remove_from_index(index, remove_outs=remove_outs)

def remove_from_index(
self,
index: Index = None,
remove_outs: bool = False,
purge: bool = False,
):
index = index or self.index
for stage in index:
stage.remove(
remove_outs=remove_outs, force=remove_outs, purge=purge
)

return list(index)

def destroy(self):
self.remove_from_index(remove_outs=False, purge=True)
self.odb.destroy()

def move(self):
pass

def status(self):
pass