Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue status: will show the current worker status #7903

Merged

Conversation

karajan1001
Copy link
Contributor

@karajan1001 karajan1001 commented Jun 16, 2022

fix: #7590
Currently queue worker status are not show in queue status makes it
hard to tell if the worker is still living.

  1. add a new method active_worker to return the currently running
    worker node name.
  2. give every worker a unique name.
  3. queue status will show how much worker running at present.

Thank you for the contribution - we'll try to review it as soon as possible. 🙏

Some tests are still required for this change.

@karajan1001 karajan1001 added A: experiments Related to dvc exp A: task-queue Related to task queue. labels Jun 16, 2022
@karajan1001 karajan1001 requested a review from pmrowla June 16, 2022 04:28
@karajan1001 karajan1001 self-assigned this Jun 16, 2022
@karajan1001 karajan1001 requested a review from a team as a code owner June 16, 2022 04:28
fix: iterative#7950
Currently queue worker status are not show in `queue status` makes it
hard to tell if the worker is still living.

1. add a new method `active_worker` to return the currently running
   worker node name.
2. give every worker a unique name.
3. `queue status` will show how much worker running at present.
@karajan1001 karajan1001 force-pushed the worker_status_in_queue_status branch from b3573ae to bdec881 Compare June 16, 2022 04:30
worker_status = self.active_worker()
while node_name in worker_status:
number += 1
node_name = f"dvc-exp-{wdir_hash}-{number}@localhost"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result celery.control.inspect().active() and celery.control.ping() returned is based on node name, different worker with the same node name will be regarded as the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional. We want to make sure we are reusing node names (so for something like queue start --jobs 4, we would only use nodes 1 through 4 (and it would never increment the node number past 4).

When the dvc-task TemporaryWorker starts, it checks whether or not another worker with the same node name exists, and if it does exist, the newly started worker exits immediately. This ensures that we always have the correct number of workers running at a time.

if os.name == "nt":
daemonize(cmd)
else:
ManagedProcess.spawn(["dvc"] + cmd, wdir=self.wdir, name=name)

for _ in range(5):
time.sleep(1)
if node_name in self.active_worker():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly created node can only be detected some time later.

Copy link
Contributor

@pmrowla pmrowla Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this check. We already expect that in some cases the new worker may not start at all (if the node name is already in use it should exit immediately).

node_name = f"dvc-exp-{wdir_hash}-{number}@localhost"
worker_status = self.active_worker()
while node_name in worker_status:
number += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to give each worker a new number. The number field should only be changing when we explicitly use exp queue start --jobs <number> (which is still disabled to always be 1).

1. No `start -j` will only start worker (1,2,3,...,j). Will not start
   j new worker.
2. Distinguish active worker and idle worker.
@karajan1001 karajan1001 force-pushed the worker_status_in_queue_status branch from edb845d to c37376c Compare June 17, 2022 09:49
@karajan1001
Copy link
Contributor Author

Looks like the multi-worker is unstable on the kombu side when both of them read and write to the message queue.

[2022-06-17 15:53:47,014: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2022-06-17 15:53:47,015: INFO/MainProcess] Connected to filesystem://localhost//
[2022-06-17 15:53:47,017: CRITICAL/MainProcess] Unrecoverable error: JSONDecodeError('Expecting value: line 1 column 1 (char 0)')
Traceback (most recent call last):
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 326, in start
    blueprint.start(self)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/celery/worker/pidbox.py", line 53, in start
    self.consumer = self.node.listen(callback=self.on_message)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/pidbox.py", line 87, in listen
    consumer = self.Consumer(channel=channel,
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/pidbox.py", line 73, in Consumer
    return Consumer(
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/messaging.py", line 387, in __init__
    self.revive(self.channel)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/messaging.py", line 409, in revive
    self.declare()
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/messaging.py", line 422, in declare
    queue.declare()
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/entity.py", line 606, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/entity.py", line 617, in _create_queue
    self.queue_bind(nowait=nowait, channel=channel)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/entity.py", line 660, in queue_bind
    return self.bind_to(self.exchange, self.routing_key,
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/entity.py", line 669, in bind_to
    return (channel or self.channel).queue_bind(
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 562, in queue_bind
    self._queue_bind(exchange, *meta)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/transport/filesystem.py", line 191, in _queue_bind
    queues = self.get_table(exchange)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/transport/filesystem.py", line 185, in get_table
    exchange_table = loads(bytes_to_str(f_obj.read()))
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/site-packages/kombu/utils/json.py", line 88, in loads
    return _loads(s)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/Users/gao/anaconda3/envs/dvc/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

During today's tests, I also noticed several times the queue worker falsely shut down with the reason of no task in the queue, but actually, there are still 3-4 tasks in it. And once or twice that the queue worker didn't shut down after all of the tasks had been succeeded.

Copy link
Contributor

@pmrowla pmrowla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code changes LGTM, we can address any UI changes in a separate PR when @dberenbaum returns next week

@pmrowla pmrowla merged commit 8d12989 into iterative:dvc-task-dev Jun 20, 2022
@karajan1001 karajan1001 deleted the worker_status_in_queue_status branch June 20, 2022 07:26
karajan1001 added a commit to karajan1001/dvc that referenced this pull request Jun 24, 2022
Followed by iterative#7903

Add a new unit test to increase the coverage of test

Co-authored-by: Peter Rowlands (변기호) <peter@pmrowla.com>
pmrowla added a commit that referenced this pull request Jun 24, 2022
Followed by #7903

Add a new unit test to increase the coverage of test

Co-authored-by: Peter Rowlands (변기호) <peter@pmrowla.com>
pmrowla added a commit that referenced this pull request Jul 5, 2022
Followed by #7903

Add a new unit test to increase the coverage of test

Co-authored-by: Peter Rowlands (변기호) <peter@pmrowla.com>
pmrowla added a commit that referenced this pull request Jul 6, 2022
Followed by #7903

Add a new unit test to increase the coverage of test

Co-authored-by: Peter Rowlands (변기호) <peter@pmrowla.com>
pmrowla added a commit that referenced this pull request Jul 11, 2022
Followed by #7903

Add a new unit test to increase the coverage of test

Co-authored-by: Peter Rowlands (변기호) <peter@pmrowla.com>
pmrowla added a commit that referenced this pull request Jul 12, 2022
Followed by #7903

Add a new unit test to increase the coverage of test

Co-authored-by: Peter Rowlands (변기호) <peter@pmrowla.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A: experiments Related to dvc exp A: task-queue Related to task queue.
Projects
No open projects
Archived in project
Development

Successfully merging this pull request may close these issues.

2 participants