Skip to content

Commit

Permalink
exp queue: monitor queue status (#7590)
Browse files Browse the repository at this point in the history
fix: #7590
1. Add two new command `queue status` and `queue attach`
2. Add unit test for the CLI
3. Make `Running` exps can be found by `get_queue_entry_by_names`
  • Loading branch information
karajan1001 authored and pmrowla committed Jul 5, 2022
1 parent 594b117 commit c98b9a1
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 4 deletions.
4 changes: 2 additions & 2 deletions dvc/commands/experiments/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def _collect_rows(
row_dict["Experiment"] = exp.get("name", "")
row_dict["rev"] = name_rev
row_dict["typ"] = typ
row_dict["Created"] = _format_time(
row_dict["Created"] = format_time(
exp.get("timestamp"), fill_value, iso
)
row_dict["parent"] = parent
Expand Down Expand Up @@ -218,7 +218,7 @@ def _sort(item):
return ret


def _format_time(datetime_obj, fill_value=FILL_VALUE, iso=False):
def format_time(datetime_obj, fill_value=FILL_VALUE, iso=False):
if datetime_obj is None:
return fill_value

Expand Down
4 changes: 3 additions & 1 deletion dvc/commands/queue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import argparse

from dvc.cli.utils import append_doc_link, fix_subparsers
from dvc.commands.queue import kill, remove, start, stop
from dvc.commands.queue import attach, kill, remove, start, status, stop

SUB_COMMANDS = [
remove,
kill,
start,
stop,
attach,
status,
]


Expand Down
44 changes: 44 additions & 0 deletions dvc/commands/queue/attach.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import argparse
import logging

from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link

logger = logging.getLogger(__name__)


class CmdQueueAttach(CmdBase):
"""Attach outputs of a exp task in queue."""

def run(self):
self.repo.experiments.celery_queue.attach(
rev=self.args.experiment,
encoding=self.args.encoding,
)

return 0


def add_parser(queue_subparsers, parent_parser):
QUEUE_ATTACH_HELP = "Attach outputs of a experiment task in queue."
queue_attach_parser = queue_subparsers.add_parser(
"attach",
parents=[parent_parser],
description=append_doc_link(QUEUE_ATTACH_HELP, "queue/attach"),
help=QUEUE_ATTACH_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
queue_attach_parser.add_argument(
"-e",
"--encoding",
help=(
"Text encoding for redirected output. Defaults to"
"`locale.getpreferredencoding()`."
),
)
queue_attach_parser.add_argument(
"experiment",
help="Experiments in queue to attach.",
metavar="<experiment>",
)
queue_attach_parser.set_defaults(func=CmdQueueAttach)
42 changes: 42 additions & 0 deletions dvc/commands/queue/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import argparse
import logging
from typing import List, Mapping, Optional

from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link
from dvc.compare import TabularData

from ..experiments.show import format_time

logger = logging.getLogger(__name__)


class CmdQueueStatus(CmdBase):
"""Kill exp task in queue."""

def run(self):
result: List[
Mapping[str, Optional[str]]
] = self.repo.experiments.celery_queue.status()
all_headers = ["Rev", "Name", "Created", "Status"]
td = TabularData(all_headers)
for exp in result:
created = format_time(exp.get("timestamp"))
td.append(
[exp["rev"], exp.get("name", ""), created, exp["status"]]
)
td.render()

return 0


def add_parser(queue_subparsers, parent_parser):
QUEUE_STATUS_HELP = "List the status of the queue tasks and workers"
queue_status_parser = queue_subparsers.add_parser(
"status",
parents=[parent_parser],
description=append_doc_link(QUEUE_STATUS_HELP, "queue/status"),
help=QUEUE_STATUS_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
queue_status_parser.set_defaults(func=CmdQueueStatus)
49 changes: 48 additions & 1 deletion dvc/repo/experiments/queue/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,38 @@ def clear(self) -> List[str]:
self._remove_revs(stash_revs)
return removed

def status(self) -> List[Mapping[str, Optional[str]]]:
"""Show the status of exp tasks in queue"""
from datetime import datetime

result: List[Mapping[str, Optional[str]]] = []

def _get_timestamp(rev):
commit = self.scm.resolve_commit(rev)
return datetime.fromtimestamp(commit.commit_time)

for queue_entry in self.iter_active():
result.append(
{
"rev": queue_entry.stash_rev,
"name": queue_entry.name,
"timestamp": _get_timestamp(queue_entry.stash_rev),
"status": "Running",
}
)

for queue_entry in self.iter_queued():
result.append(
{
"rev": queue_entry.stash_rev,
"name": queue_entry.name,
"timestamp": _get_timestamp(queue_entry.stash_rev),
"status": "Queued",
}
)

return result

@abstractmethod
def _remove_revs(self, stash_revs: Mapping[str, ExpStashEntry]):
"""Remove the specified entries from the queue by stash revision."""
Expand Down Expand Up @@ -195,6 +227,19 @@ def shutdown(self, kill: bool = False):
finish any active experiments before shutting down.
"""

def attach(
self,
rev: str,
encoding: Optional[str] = None,
):
"""Iterate over lines in redirected output for a process.
Args:
rev: Stash rev or running exp name to be attached.
encoding: Text encoding for redirected output. Defaults to
`locale.getpreferredencoding()`.
"""

def _stash_exp(
self,
*args,
Expand Down Expand Up @@ -515,13 +560,14 @@ def get_queue_entry_by_names(
self,
exp_names: Collection[str],
) -> Dict[str, Optional[QueueEntry]]:
from funcy import concat
from scmrepo.exceptions import RevError as InternalRevError

exp_name_set = set(exp_names)
result: Dict[str, Optional[QueueEntry]] = {}
rev_entries = {}

for entry in self.iter_queued():
for entry in concat(self.iter_queued(), self.iter_active()):
if entry.name in exp_name_set:
result[entry.name] = entry
else:
Expand All @@ -534,4 +580,5 @@ def get_queue_entry_by_names(
result[exp_name] = rev_entries[rev]
except InternalRevError:
result[exp_name] = None

return result
26 changes: 26 additions & 0 deletions dvc/repo/experiments/queue/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,25 @@ def shutdown(self, kill: bool = False):
else:
self.celery.control.shutdown()

def attach(
self,
rev: str,
encoding: Optional[str] = None,
):
from dvc.ui import ui

queue_entry: Optional[QueueEntry] = self.get_queue_entry_by_names(
{rev}
).get(rev)
if queue_entry is None:
raise UnresolvedExpNamesError([rev])
active_queue_entry = set(self.iter_active())
if queue_entry not in active_queue_entry:
ui.write("Cannot attach to an unstarted task")
return
for line in self.proc.follow(queue_entry.stash_rev, encoding):
ui.write(line)


class WorkspaceQueue(BaseStashQueue):
def put(self, *args, **kwargs) -> QueueEntry:
Expand Down Expand Up @@ -384,3 +403,10 @@ def kill(self, revs: Collection[str]) -> None:

def shutdown(self, kill: bool = False):
raise NotImplementedError

def attach(
self,
rev: str,
encoding: Optional[str] = None,
):
raise NotImplementedError
42 changes: 42 additions & 0 deletions tests/unit/command/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from dvc.cli import parse_args
from dvc.commands.queue.attach import CmdQueueAttach
from dvc.commands.queue.kill import CmdQueueKill
from dvc.commands.queue.remove import CmdQueueRemove
from dvc.commands.queue.start import CmdQueueStart
from dvc.commands.queue.status import CmdQueueStatus
from dvc.commands.queue.stop import CmdQueueStop


Expand Down Expand Up @@ -95,3 +97,43 @@ def test_experiments_stop(dvc, scm, mocker):

assert cmd.run() == 0
m.assert_called_once_with(kill=True)


def test_experiments_status(dvc, scm, mocker):
cli_args = parse_args(
[
"queue",
"status",
]
)
assert cli_args.func == CmdQueueStatus

cmd = cli_args.func(cli_args)
m = mocker.patch(
"dvc.repo.experiments.queue.local.LocalCeleryQueue.status",
)

assert cmd.run() == 0
m.assert_called_once_with()


def test_experiments_attach(dvc, scm, mocker):
cli_args = parse_args(
[
"queue",
"attach",
"exp1",
"-e",
"utf8",
]
)
assert cli_args.func == CmdQueueAttach

cmd = cli_args.func(cli_args)
m = mocker.patch(
"dvc.repo.experiments.queue.local.LocalCeleryQueue.attach",
return_value={},
)

assert cmd.run() == 0
m.assert_called_once_with(rev="exp1", encoding="utf8")

0 comments on commit c98b9a1

Please sign in to comment.