From c634ff5a6d9d589684f0ce68d003c1280395d6f0 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 26 Jan 2021 12:54:50 +0300 Subject: [PATCH 1/3] upload: implement --to-remote flag --- dvc/command/update.py | 26 +++++++++++++++++++++ dvc/repo/update.py | 12 ++++++++-- dvc/stage/__init__.py | 6 +++-- dvc/stage/imports.py | 12 +++++++++- tests/func/test_update.py | 48 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 99 insertions(+), 5 deletions(-) diff --git a/dvc/command/update.py b/dvc/command/update.py index 2bd50b9a09..8d0536509b 100644 --- a/dvc/command/update.py +++ b/dvc/command/update.py @@ -16,6 +16,9 @@ def run(self): targets=self.args.targets, rev=self.args.rev, recursive=self.args.recursive, + to_remote=self.args.to_remote, + remote=self.args.remote, + jobs=self.args.jobs, ) except DvcException: logger.exception("failed update data") @@ -48,4 +51,27 @@ def add_parser(subparsers, parent_parser): default=False, help="Update all stages in the specified directory.", ) + update_parser.add_argument( + "--to-remote", + action="store_true", + default=False, + help="Update data directly on the remote", + ) + update_parser.add_argument( + "-r", + "--remote", + help="Remote storage to perform updates to", + metavar="", + ) + update_parser.add_argument( + "-j", + "--jobs", + type=int, + help=( + "Number of jobs to run simultaneously. " + "The default value is 4 * cpu_count(). " + "For SSH remotes, the default is 4. " + ), + metavar="", + ) update_parser.set_defaults(func=CmdUpdate) diff --git a/dvc/repo/update.py b/dvc/repo/update.py index e0a08333eb..c4d2e2adc9 100644 --- a/dvc/repo/update.py +++ b/dvc/repo/update.py @@ -2,7 +2,15 @@ @locked -def update(self, targets=None, rev=None, recursive=False): +def update( + self, + targets=None, + rev=None, + recursive=False, + to_remote=False, + remote=None, + jobs=None, +): from ..dvcfile import Dvcfile if not targets: @@ -16,7 +24,7 @@ def update(self, targets=None, rev=None, recursive=False): stages.update(self.stage.collect(target, recursive=recursive)) for stage in stages: - stage.update(rev) + stage.update(rev, to_remote=to_remote, remote=remote, jobs=jobs) dvcfile = Dvcfile(self, stage.path) dvcfile.dump(stage) stages.add(stage) diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 5480e4eee2..640062401c 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -411,10 +411,12 @@ def reproduce(self, interactive=False, **kwargs): return self - def update(self, rev=None): + def update(self, rev=None, to_remote=False, remote=None, jobs=None): if not (self.is_repo_import or self.is_import): raise StageUpdateError(self.relpath) - update_import(self, rev=rev) + update_import( + self, rev=rev, to_remote=to_remote, remote=remote, jobs=jobs + ) @property def can_be_skipped(self): diff --git a/dvc/stage/imports.py b/dvc/stage/imports.py index 01792002ac..88aa020ee3 100644 --- a/dvc/stage/imports.py +++ b/dvc/stage/imports.py @@ -3,10 +3,20 @@ logger = logging.getLogger(__name__) -def update_import(stage, rev=None): +def _update_import_on_remote(stage, remote, jobs): + url = stage.deps[0].path_info.url + stage.outs[0].hash_info = stage.repo.cloud.transfer( + url, jobs=jobs, remote=remote, command="update" + ) + + +def update_import(stage, rev=None, to_remote=False, remote=None, jobs=None): stage.deps[0].update(rev=rev) frozen = stage.frozen stage.frozen = False + if to_remote: + return _update_import_on_remote(stage, remote, jobs) + try: stage.reproduce() finally: diff --git a/tests/func/test_update.py b/tests/func/test_update.py index 6f406a7cd0..758f41b1b7 100644 --- a/tests/func/test_update.py +++ b/tests/func/test_update.py @@ -315,3 +315,51 @@ def test_update_from_subrepos(tmp_dir, dvc, erepo_dir, is_dvc): "url": repo_path, "rev_lock": erepo_dir.scm.get_rev(), } + + +@pytest.mark.parametrize( + "workspace", + [ + pytest.lazy_fixture("local_cloud"), + pytest.lazy_fixture("s3"), + pytest.lazy_fixture("gs"), + pytest.lazy_fixture("hdfs"), + ], + indirect=True, +) +def test_update_import_url_to_remote(tmp_dir, dvc, workspace, local_remote): + workspace.gen("foo", "foo") + stage = dvc.imp_url("remote://workspace/foo", to_remote=True) + + workspace.gen("foo", "bar") + stage = dvc.update(stage.path, to_remote=True) + + dvc.pull("foo") + assert (tmp_dir / "foo").read_text() == "bar" + + +@pytest.mark.parametrize( + "workspace", + [ + pytest.lazy_fixture("local_cloud"), + pytest.lazy_fixture("s3"), + pytest.lazy_fixture("gs"), + pytest.lazy_fixture("hdfs"), + ], + indirect=True, +) +def test_update_import_url_to_remote_directory( + tmp_dir, dvc, workspace, local_remote +): + workspace.gen({"data": {"foo": "foo", "bar": {"baz": "baz"}}}) + stage = dvc.imp_url("remote://workspace/data", to_remote=True) + + workspace.gen({"data": {"foo2": "foo2", "bar": {"baz2": "baz2"}}}) + stage = dvc.update(stage.path, to_remote=True) + + dvc.pull("data") + assert (tmp_dir / "data").read_text() == { + "foo": "foo", + "foo2": "foo2", + "bar": {"baz": "baz", "baz2": "baz2"}, + } From a6e33452505191e4fa6ae5a731e4846e6bb50c30 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 27 Jan 2021 10:02:03 +0300 Subject: [PATCH 2/3] unit tests --- tests/unit/command/test_update.py | 37 ++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/tests/unit/command/test_update.py b/tests/unit/command/test_update.py index 087ab44832..8092810eae 100644 --- a/tests/unit/command/test_update.py +++ b/tests/unit/command/test_update.py @@ -13,5 +13,40 @@ def test_update(dvc, mocker): assert cmd.run() == 0 m.assert_called_once_with( - targets=["target1", "target2"], rev="REV", recursive=True, + targets=["target1", "target2"], + rev="REV", + recursive=True, + to_remote=False, + remote=None, + jobs=None, + ) + + +def test_update_to_remote(dvc, mocker): + cli_args = parse_args( + [ + "update", + "target1", + "target2", + "--to-remote", + "-j", + "5", + "-r", + "remote", + "--recursive", + ] + ) + assert cli_args.func == CmdUpdate + cmd = cli_args.func(cli_args) + m = mocker.patch("dvc.repo.Repo.update") + + assert cmd.run() == 0 + + m.assert_called_once_with( + targets=["target1", "target2"], + rev=None, + recursive=True, + to_remote=True, + remote="remote", + jobs=5, ) From bae5b3e643c11565bbe0f7ff3623e96c5b251772 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 2 Feb 2021 16:51:03 +0300 Subject: [PATCH 3/3] preserve frozen= for stage --- dvc/stage/imports.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dvc/stage/imports.py b/dvc/stage/imports.py index 88aa020ee3..c3f91628e0 100644 --- a/dvc/stage/imports.py +++ b/dvc/stage/imports.py @@ -14,11 +14,11 @@ def update_import(stage, rev=None, to_remote=False, remote=None, jobs=None): stage.deps[0].update(rev=rev) frozen = stage.frozen stage.frozen = False - if to_remote: - return _update_import_on_remote(stage, remote, jobs) - try: - stage.reproduce() + if to_remote: + _update_import_on_remote(stage, remote, jobs) + else: + stage.reproduce() finally: stage.frozen = frozen