From c98b9a1dbb44e47fc3028cd198cffcdb06cd3429 Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Tue, 17 May 2022 16:45:39 +0800 Subject: [PATCH] exp queue: monitor queue status (#7590) fix: #7590 1. Add two new command `queue status` and `queue attach` 2. Add unit test for the CLI 3. Make `Running` exps can be found by `get_queue_entry_by_names` --- dvc/commands/experiments/show.py | 4 +-- dvc/commands/queue/__init__.py | 4 ++- dvc/commands/queue/attach.py | 44 ++++++++++++++++++++++++++ dvc/commands/queue/status.py | 42 +++++++++++++++++++++++++ dvc/repo/experiments/queue/base.py | 49 ++++++++++++++++++++++++++++- dvc/repo/experiments/queue/local.py | 26 +++++++++++++++ tests/unit/command/test_queue.py | 42 +++++++++++++++++++++++++ 7 files changed, 207 insertions(+), 4 deletions(-) create mode 100644 dvc/commands/queue/attach.py create mode 100644 dvc/commands/queue/status.py diff --git a/dvc/commands/experiments/show.py b/dvc/commands/experiments/show.py index 59fe267268..81feafd1a2 100644 --- a/dvc/commands/experiments/show.py +++ b/dvc/commands/experiments/show.py @@ -138,7 +138,7 @@ def _collect_rows( row_dict["Experiment"] = exp.get("name", "") row_dict["rev"] = name_rev row_dict["typ"] = typ - row_dict["Created"] = _format_time( + row_dict["Created"] = format_time( exp.get("timestamp"), fill_value, iso ) row_dict["parent"] = parent @@ -218,7 +218,7 @@ def _sort(item): return ret -def _format_time(datetime_obj, fill_value=FILL_VALUE, iso=False): +def format_time(datetime_obj, fill_value=FILL_VALUE, iso=False): if datetime_obj is None: return fill_value diff --git a/dvc/commands/queue/__init__.py b/dvc/commands/queue/__init__.py index 5c3998c897..bc1c0d16fa 100644 --- a/dvc/commands/queue/__init__.py +++ b/dvc/commands/queue/__init__.py @@ -1,13 +1,15 @@ import argparse from dvc.cli.utils import append_doc_link, fix_subparsers -from dvc.commands.queue import kill, remove, start, stop +from dvc.commands.queue import attach, kill, remove, start, status, stop SUB_COMMANDS = [ remove, kill, start, stop, + attach, + status, ] diff --git a/dvc/commands/queue/attach.py b/dvc/commands/queue/attach.py new file mode 100644 index 0000000000..efb6c16d6d --- /dev/null +++ b/dvc/commands/queue/attach.py @@ -0,0 +1,44 @@ +import argparse +import logging + +from dvc.cli.command import CmdBase +from dvc.cli.utils import append_doc_link + +logger = logging.getLogger(__name__) + + +class CmdQueueAttach(CmdBase): + """Attach outputs of a exp task in queue.""" + + def run(self): + self.repo.experiments.celery_queue.attach( + rev=self.args.experiment, + encoding=self.args.encoding, + ) + + return 0 + + +def add_parser(queue_subparsers, parent_parser): + QUEUE_ATTACH_HELP = "Attach outputs of a experiment task in queue." + queue_attach_parser = queue_subparsers.add_parser( + "attach", + parents=[parent_parser], + description=append_doc_link(QUEUE_ATTACH_HELP, "queue/attach"), + help=QUEUE_ATTACH_HELP, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + queue_attach_parser.add_argument( + "-e", + "--encoding", + help=( + "Text encoding for redirected output. Defaults to" + "`locale.getpreferredencoding()`." + ), + ) + queue_attach_parser.add_argument( + "experiment", + help="Experiments in queue to attach.", + metavar="", + ) + queue_attach_parser.set_defaults(func=CmdQueueAttach) diff --git a/dvc/commands/queue/status.py b/dvc/commands/queue/status.py new file mode 100644 index 0000000000..8e38527520 --- /dev/null +++ b/dvc/commands/queue/status.py @@ -0,0 +1,42 @@ +import argparse +import logging +from typing import List, Mapping, Optional + +from dvc.cli.command import CmdBase +from dvc.cli.utils import append_doc_link +from dvc.compare import TabularData + +from ..experiments.show import format_time + +logger = logging.getLogger(__name__) + + +class CmdQueueStatus(CmdBase): + """Kill exp task in queue.""" + + def run(self): + result: List[ + Mapping[str, Optional[str]] + ] = self.repo.experiments.celery_queue.status() + all_headers = ["Rev", "Name", "Created", "Status"] + td = TabularData(all_headers) + for exp in result: + created = format_time(exp.get("timestamp")) + td.append( + [exp["rev"], exp.get("name", ""), created, exp["status"]] + ) + td.render() + + return 0 + + +def add_parser(queue_subparsers, parent_parser): + QUEUE_STATUS_HELP = "List the status of the queue tasks and workers" + queue_status_parser = queue_subparsers.add_parser( + "status", + parents=[parent_parser], + description=append_doc_link(QUEUE_STATUS_HELP, "queue/status"), + help=QUEUE_STATUS_HELP, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + queue_status_parser.set_defaults(func=CmdQueueStatus) diff --git a/dvc/repo/experiments/queue/base.py b/dvc/repo/experiments/queue/base.py index e772c9571a..b30b8b062c 100644 --- a/dvc/repo/experiments/queue/base.py +++ b/dvc/repo/experiments/queue/base.py @@ -147,6 +147,38 @@ def clear(self) -> List[str]: self._remove_revs(stash_revs) return removed + def status(self) -> List[Mapping[str, Optional[str]]]: + """Show the status of exp tasks in queue""" + from datetime import datetime + + result: List[Mapping[str, Optional[str]]] = [] + + def _get_timestamp(rev): + commit = self.scm.resolve_commit(rev) + return datetime.fromtimestamp(commit.commit_time) + + for queue_entry in self.iter_active(): + result.append( + { + "rev": queue_entry.stash_rev, + "name": queue_entry.name, + "timestamp": _get_timestamp(queue_entry.stash_rev), + "status": "Running", + } + ) + + for queue_entry in self.iter_queued(): + result.append( + { + "rev": queue_entry.stash_rev, + "name": queue_entry.name, + "timestamp": _get_timestamp(queue_entry.stash_rev), + "status": "Queued", + } + ) + + return result + @abstractmethod def _remove_revs(self, stash_revs: Mapping[str, ExpStashEntry]): """Remove the specified entries from the queue by stash revision.""" @@ -195,6 +227,19 @@ def shutdown(self, kill: bool = False): finish any active experiments before shutting down. """ + def attach( + self, + rev: str, + encoding: Optional[str] = None, + ): + """Iterate over lines in redirected output for a process. + + Args: + rev: Stash rev or running exp name to be attached. + encoding: Text encoding for redirected output. Defaults to + `locale.getpreferredencoding()`. + """ + def _stash_exp( self, *args, @@ -515,13 +560,14 @@ def get_queue_entry_by_names( self, exp_names: Collection[str], ) -> Dict[str, Optional[QueueEntry]]: + from funcy import concat from scmrepo.exceptions import RevError as InternalRevError exp_name_set = set(exp_names) result: Dict[str, Optional[QueueEntry]] = {} rev_entries = {} - for entry in self.iter_queued(): + for entry in concat(self.iter_queued(), self.iter_active()): if entry.name in exp_name_set: result[entry.name] = entry else: @@ -534,4 +580,5 @@ def get_queue_entry_by_names( result[exp_name] = rev_entries[rev] except InternalRevError: result[exp_name] = None + return result diff --git a/dvc/repo/experiments/queue/local.py b/dvc/repo/experiments/queue/local.py index 3367b5412d..b23614a9fc 100644 --- a/dvc/repo/experiments/queue/local.py +++ b/dvc/repo/experiments/queue/local.py @@ -260,6 +260,25 @@ def shutdown(self, kill: bool = False): else: self.celery.control.shutdown() + def attach( + self, + rev: str, + encoding: Optional[str] = None, + ): + from dvc.ui import ui + + queue_entry: Optional[QueueEntry] = self.get_queue_entry_by_names( + {rev} + ).get(rev) + if queue_entry is None: + raise UnresolvedExpNamesError([rev]) + active_queue_entry = set(self.iter_active()) + if queue_entry not in active_queue_entry: + ui.write("Cannot attach to an unstarted task") + return + for line in self.proc.follow(queue_entry.stash_rev, encoding): + ui.write(line) + class WorkspaceQueue(BaseStashQueue): def put(self, *args, **kwargs) -> QueueEntry: @@ -384,3 +403,10 @@ def kill(self, revs: Collection[str]) -> None: def shutdown(self, kill: bool = False): raise NotImplementedError + + def attach( + self, + rev: str, + encoding: Optional[str] = None, + ): + raise NotImplementedError diff --git a/tests/unit/command/test_queue.py b/tests/unit/command/test_queue.py index 2180015c6f..bc36d75e1d 100644 --- a/tests/unit/command/test_queue.py +++ b/tests/unit/command/test_queue.py @@ -1,7 +1,9 @@ from dvc.cli import parse_args +from dvc.commands.queue.attach import CmdQueueAttach from dvc.commands.queue.kill import CmdQueueKill from dvc.commands.queue.remove import CmdQueueRemove from dvc.commands.queue.start import CmdQueueStart +from dvc.commands.queue.status import CmdQueueStatus from dvc.commands.queue.stop import CmdQueueStop @@ -95,3 +97,43 @@ def test_experiments_stop(dvc, scm, mocker): assert cmd.run() == 0 m.assert_called_once_with(kill=True) + + +def test_experiments_status(dvc, scm, mocker): + cli_args = parse_args( + [ + "queue", + "status", + ] + ) + assert cli_args.func == CmdQueueStatus + + cmd = cli_args.func(cli_args) + m = mocker.patch( + "dvc.repo.experiments.queue.local.LocalCeleryQueue.status", + ) + + assert cmd.run() == 0 + m.assert_called_once_with() + + +def test_experiments_attach(dvc, scm, mocker): + cli_args = parse_args( + [ + "queue", + "attach", + "exp1", + "-e", + "utf8", + ] + ) + assert cli_args.func == CmdQueueAttach + + cmd = cli_args.func(cli_args) + m = mocker.patch( + "dvc.repo.experiments.queue.local.LocalCeleryQueue.attach", + return_value={}, + ) + + assert cmd.run() == 0 + m.assert_called_once_with(rev="exp1", encoding="utf8")