Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue status: will show the current worker status #7903

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dvc/commands/queue/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link
from dvc.compare import TabularData
from dvc.ui import ui

from ..experiments.show import format_time

Expand All @@ -26,6 +27,13 @@ def run(self):
[exp["rev"][:7], exp.get("name", ""), created, exp["status"]]
)
td.render()
worker_count = len(self.repo.experiments.celery_queue.active_worker())
if worker_count == 1:
ui.write("There is 1 worker active at present.")
elif worker_count == 0:
ui.write("No worker active at present.")
else:
ui.write(f"There are {worker_count} worker active at present.")

return 0

Expand Down
28 changes: 22 additions & 6 deletions dvc/repo/experiments/queue/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import locale
import logging
import os
import time
from collections import defaultdict
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -124,22 +125,31 @@ def worker(self) -> "TemporaryWorker":
)

def spawn_worker(self):
from shortuuid import uuid

from dvc_task.proc.process import ManagedProcess

logger.debug("Spawning exp queue worker")
wdir_hash = hashlib.sha256(self.wdir.encode("utf-8")).hexdigest()[:6]
node_name = f"dvc-exp-{wdir_hash}-1@localhost"
number = 1
node_name = f"dvc-exp-{wdir_hash}-{number}@localhost"
worker_status = self.active_worker()
while node_name in worker_status:
number += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to give each worker a new number. The number field should only be changing when we explicitly use exp queue start --jobs <number> (which is still disabled to always be 1).

node_name = f"dvc-exp-{wdir_hash}-{number}@localhost"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result celery.control.inspect().active() and celery.control.ping() returned is based on node name, different worker with the same node name will be regarded as the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional. We want to make sure we are reusing node names (so for something like queue start --jobs 4, we would only use nodes 1 through 4 (and it would never increment the node number past 4).

When the dvc-task TemporaryWorker starts, it checks whether or not another worker with the same node name exists, and if it does exist, the newly started worker exits immediately. This ensures that we always have the correct number of workers running at a time.


cmd = ["exp", "queue-worker", node_name]
name = "dvc-exp-worker"
if logger.getEffectiveLevel() < logging.INFO:
name = name + str(uuid())
name = f"dvc-exp-worker-{number}"
logger.debug(f"start worker: {name}, node: {node_name}")
if os.name == "nt":
daemonize(cmd)
else:
ManagedProcess.spawn(["dvc"] + cmd, wdir=self.wdir, name=name)

for _ in range(5):
time.sleep(1)
if node_name in self.active_worker():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly created node can only be detected some time later.

Copy link
Contributor

@pmrowla pmrowla Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this check. We already expect that in some cases the new worker may not start at all (if the node name is already in use it should exit immediately).

return
logger.debug(f"worker {name} node {node_name} didn't start in 5 sec")

def put(self, *args, **kwargs) -> QueueEntry:
"""Stash an experiment and add it to the queue."""
entry = self._stash_exp(*args, **kwargs)
Expand Down Expand Up @@ -316,6 +326,12 @@ def logs(
) as fobj:
ui.write(fobj.read())

def active_worker(self) -> Set:
"""Return the current active celery worker"""
status = self.celery.control.inspect().active() or {}
logger.debug(f"Worker status: {status}")
return {name for name in status if status[name]}


class WorkspaceQueue(BaseStashQueue):
def put(self, *args, **kwargs) -> QueueEntry:
Expand Down