Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue status: will show the current worker status #7903

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions dvc/commands/queue/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ class CmdQueueStart(CmdBase):
"""Start exp queue workers."""

def run(self):
for _ in range(self.args.jobs):
self.repo.experiments.celery_queue.spawn_worker()

suffix = "s" if self.args.jobs > 1 else ""
ui.write(
f"Start {self.args.jobs} worker{suffix} to process the queue."
started = self.repo.experiments.celery_queue.start_workers(
self.args.jobs
)

suffix = "s" if started > 1 else ""
ui.write(f"Start {started} new worker{suffix} to process the queue.")

return 0


Expand Down
24 changes: 24 additions & 0 deletions dvc/commands/queue/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link
from dvc.compare import TabularData
from dvc.ui import ui

from ..experiments.show import format_time

Expand All @@ -27,6 +28,29 @@ def run(self):
)
td.render()

if not result:
ui.write("No experiments in task queue for now.")

worker_status = self.repo.experiments.celery_queue.worker_status()
active_count = len(
[name for name, task in worker_status.items() if task]
)
idle_count = len(worker_status) - active_count

if active_count == 1:
ui.write("There is 1 worker active", end=", ")
elif active_count == 0:
ui.write("No worker active", end=", ")
else:
ui.write(f"There are {active_count} workers active", end=", ")

if idle_count == 1:
ui.write("1 worker idle at present.")
elif idle_count == 0:
ui.write("no worker idle at present.")
else:
ui.write(f"{idle_count} workers idle at present.")

return 0


Expand Down
49 changes: 43 additions & 6 deletions dvc/repo/experiments/queue/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,54 @@ def worker(self) -> "TemporaryWorker":
timeout=10,
)

def spawn_worker(self):
from shortuuid import uuid
def spawn_worker(self, num: int = 1):
"""spawn one single worker to process to queued tasks.

Argument:
num: serial number of the worker.

"""
from dvc_task.proc.process import ManagedProcess

logger.debug("Spawning exp queue worker")
wdir_hash = hashlib.sha256(self.wdir.encode("utf-8")).hexdigest()[:6]
node_name = f"dvc-exp-{wdir_hash}-1@localhost"
node_name = f"dvc-exp-{wdir_hash}-{num}@localhost"
cmd = ["exp", "queue-worker", node_name]
name = "dvc-exp-worker"
if logger.getEffectiveLevel() < logging.INFO:
name = name + str(uuid())
name = f"dvc-exp-worker-{num}"

logger.debug(f"start a new worker: {name}, node: {node_name}")
if os.name == "nt":
daemonize(cmd)
else:
ManagedProcess.spawn(["dvc"] + cmd, wdir=self.wdir, name=name)

def start_workers(self, count: int) -> int:
"""start some workers to process the queued tasks.

Argument:
count: worker number to be started.

Returns:
newly spawned worker number.
"""

logger.debug(f"Spawning {count} exp queue workers")
active_worker: Dict = self.worker_status()

started = 0
for num in range(1, 1 + count):
wdir_hash = hashlib.sha256(self.wdir.encode("utf-8")).hexdigest()[
:6
]
node_name = f"dvc-exp-{wdir_hash}-{num}@localhost"
if node_name in active_worker:
logger.debug(f"Exp queue worker {node_name} already exist")
continue
self.spawn_worker(num)
started += 1

return started

def put(self, *args, **kwargs) -> QueueEntry:
"""Stash an experiment and add it to the queue."""
entry = self._stash_exp(*args, **kwargs)
Expand Down Expand Up @@ -316,6 +347,12 @@ def logs(
) as fobj:
ui.write(fobj.read())

def worker_status(self) -> Dict:
"""Return the current active celery worker"""
status = self.celery.control.inspect().active() or {}
logger.debug(f"Worker status: {status}")
return status


class WorkspaceQueue(BaseStashQueue):
def put(self, *args, **kwargs) -> QueueEntry:
Expand Down