Skip to content

Commit

Permalink
queue status: will show the current worker status
Browse files Browse the repository at this point in the history
fix: #7950
Currently queue worker status are not show in `queue status` makes it
hard to tell if the worker is still living.

1. add a new method `active_worker` to return the currently running
   worker node name.
2. give every worker a unique name.
3. `queue status` will show how much worker running at present.
  • Loading branch information
karajan1001 authored and pmrowla committed Jul 12, 2022
1 parent 25b5d1d commit 3918db9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
8 changes: 8 additions & 0 deletions dvc/commands/queue/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link
from dvc.compare import TabularData
from dvc.ui import ui

from ..experiments.show import format_time

Expand All @@ -26,6 +27,13 @@ def run(self):
[exp["rev"][:7], exp.get("name", ""), created, exp["status"]]
)
td.render()
worker_count = len(self.repo.experiments.celery_queue.active_worker())
if worker_count == 1:
ui.write("There is 1 worker active at present.")
elif worker_count == 0:
ui.write("No worker active at present.")
else:
ui.write(f"There are {worker_count} worker active at present.")

return 0

Expand Down
28 changes: 22 additions & 6 deletions dvc/repo/experiments/queue/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import locale
import logging
import os
import time
from collections import defaultdict
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -124,22 +125,31 @@ def worker(self) -> "TemporaryWorker":
)

def spawn_worker(self):
from shortuuid import uuid

from dvc_task.proc.process import ManagedProcess

logger.debug("Spawning exp queue worker")
wdir_hash = hashlib.sha256(self.wdir.encode("utf-8")).hexdigest()[:6]
node_name = f"dvc-exp-{wdir_hash}-1@localhost"
number = 1
node_name = f"dvc-exp-{wdir_hash}-{number}@localhost"
worker_status = self.active_worker()
while node_name in worker_status:
number += 1
node_name = f"dvc-exp-{wdir_hash}-{number}@localhost"

cmd = ["exp", "queue-worker", node_name]
name = "dvc-exp-worker"
if logger.getEffectiveLevel() < logging.INFO:
name = name + str(uuid())
name = f"dvc-exp-worker-{number}"
logger.debug(f"start worker: {name}, node: {node_name}")
if os.name == "nt":
daemonize(cmd)
else:
ManagedProcess.spawn(["dvc"] + cmd, wdir=self.wdir, name=name)

for _ in range(5):
time.sleep(1)
if node_name in self.active_worker():
return
logger.debug(f"worker {name} node {node_name} didn't start in 5 sec")

def put(self, *args, **kwargs) -> QueueEntry:
"""Stash an experiment and add it to the queue."""
entry = self._stash_exp(*args, **kwargs)
Expand Down Expand Up @@ -316,6 +326,12 @@ def logs(
) as fobj:
ui.write(fobj.read())

def active_worker(self) -> Set:
"""Return the current active celery worker"""
status = self.celery.control.inspect().active() or {}
logger.debug(f"Worker status: {status}")
return {name for name in status if status[name]}


class WorkspaceQueue(BaseStashQueue):
def put(self, *args, **kwargs) -> QueueEntry:
Expand Down

0 comments on commit 3918db9

Please sign in to comment.