Skip to content

Commit

Permalink
queue status: some reviewed changes.
Browse files Browse the repository at this point in the history
1. No `start -j` will only start worker (1,2,3,...,j). Will not start
   j new worker.
2. Distinguish active worker and idle worker.
  • Loading branch information
karajan1001 committed Jun 17, 2022
1 parent bdec881 commit c37376c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 30 deletions.
11 changes: 5 additions & 6 deletions dvc/commands/queue/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ class CmdQueueStart(CmdBase):
"""Start exp queue workers."""

def run(self):
for _ in range(self.args.jobs):
self.repo.experiments.celery_queue.spawn_worker()

suffix = "s" if self.args.jobs > 1 else ""
ui.write(
f"Start {self.args.jobs} worker{suffix} to process the queue."
started = self.repo.experiments.celery_queue.start_workers(
self.args.jobs
)

suffix = "s" if started > 1 else ""
ui.write(f"Start {started} new worker{suffix} to process the queue.")

return 0


Expand Down
28 changes: 22 additions & 6 deletions dvc/commands/queue/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,29 @@ def run(self):
[exp["rev"][:7], exp.get("name", ""), created, exp["status"]]
)
td.render()
worker_count = len(self.repo.experiments.celery_queue.active_worker())
if worker_count == 1:
ui.write("There is 1 worker active at present.")
elif worker_count == 0:
ui.write("No worker active at present.")

if not result:
ui.write("No experiments in task queue for now.")

worker_status = self.repo.experiments.celery_queue.worker_status()
active_count = len(
[name for name, task in worker_status.items() if task]
)
idle_count = len(worker_status) - active_count

if active_count == 1:
ui.write("There is 1 worker active", end=", ")
elif active_count == 0:
ui.write("No worker active", end=", ")
else:
ui.write(f"There are {active_count} workers active", end=", ")

if idle_count == 1:
ui.write("1 worker idle at present.")
elif idle_count == 0:
ui.write("no worker idle at present.")
else:
ui.write(f"There are {worker_count} worker active at present.")
ui.write(f"{idle_count} workers idle at present.")

return 0

Expand Down
57 changes: 39 additions & 18 deletions dvc/repo/experiments/queue/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import locale
import logging
import os
import time
from collections import defaultdict
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -124,31 +123,53 @@ def worker(self) -> "TemporaryWorker":
timeout=10,
)

def spawn_worker(self):
def spawn_worker(self, num: int = 1):
"""spawn one single worker to process to queued tasks.
Argument:
num: serial number of the worker.
"""
from dvc_task.proc.process import ManagedProcess

logger.debug("Spawning exp queue worker")
wdir_hash = hashlib.sha256(self.wdir.encode("utf-8")).hexdigest()[:6]
number = 1
node_name = f"dvc-exp-{wdir_hash}-{number}@localhost"
worker_status = self.active_worker()
while node_name in worker_status:
number += 1
node_name = f"dvc-exp-{wdir_hash}-{number}@localhost"

node_name = f"dvc-exp-{wdir_hash}-{num}@localhost"
cmd = ["exp", "queue-worker", node_name]
name = f"dvc-exp-worker-{number}"
logger.debug(f"start worker: {name}, node: {node_name}")
name = f"dvc-exp-worker-{num}"

logger.debug(f"start a new worker: {name}, node: {node_name}")
if os.name == "nt":
daemonize(cmd)
else:
ManagedProcess.spawn(["dvc"] + cmd, wdir=self.wdir, name=name)

for _ in range(5):
time.sleep(1)
if node_name in self.active_worker():
return
logger.debug(f"worker {name} node {node_name} didn't start in 5 sec")
def start_workers(self, count: int) -> int:
"""start some workers to process the queued tasks.
Argument:
count: worker number to be started.
Returns:
newly spawned worker number.
"""

logger.debug(f"Spawning {count} exp queue workers")
active_worker: Dict = self.worker_status()

started = 0
for num in range(1, 1 + count):
wdir_hash = hashlib.sha256(self.wdir.encode("utf-8")).hexdigest()[
:6
]
node_name = f"dvc-exp-{wdir_hash}-{num}@localhost"
if node_name in active_worker:
logger.debug(f"Exp queue worker {node_name} already exist")
continue
self.spawn_worker(num)
started += 1

return started

def put(self, *args, **kwargs) -> QueueEntry:
"""Stash an experiment and add it to the queue."""
Expand Down Expand Up @@ -326,11 +347,11 @@ def logs(
) as fobj:
ui.write(fobj.read())

def active_worker(self) -> Set:
def worker_status(self) -> Dict:
"""Return the current active celery worker"""
status = self.celery.control.inspect().active() or {}
logger.debug(f"Worker status: {status}")
return {name for name in status if status[name]}
return status


class WorkspaceQueue(BaseStashQueue):
Expand Down

0 comments on commit c37376c

Please sign in to comment.