Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate worker overhead #2156

Open
mrocklin opened this issue Aug 2, 2018 · 10 comments
Open

Investigate worker overhead #2156

mrocklin opened this issue Aug 2, 2018 · 10 comments

Comments

@mrocklin
Copy link
Member

mrocklin commented Aug 2, 2018

Motivated by a desire for reduced latencies on the workers for Actors (we found that 1ms things were taking 5ms) we added a thread that statistically profiles the event loop. This showed overhead from a couple surprising sources:

  1. psutil and the SystemMonitor
  2. Tornado's write_to_fd which apparently isn't entirely non-blocking, see this stack overflow question
  3. Tornado's add_callback overhead, see this stack overflow question

I'm not sure how best to address these. There are probably a few approaches:

  1. Check that we're using psutil appropriately, and that there isn't some better way to regularly poll system use at high-ish frequency (currently we poll every 500ms)
  2. Quantify the cause of add_callback, and see if there aren't some occasions where we can reduce our use of Tornado
  3. Investigate other concurrency frameworks, like asyncio + uvloop. This sounds neat, but is likely expensive for many reasons. I did try using uvloop + asyncio + tornado but it wasn't very effective. The overhead appears to be higher in this stack so that uvloop doesn't seem to do much good.
@mrocklin
Copy link
Member Author

mrocklin commented Aug 6, 2018

I can reduce the overhead of using add_callback by patching tornado to use asyncio.call_soon rather than asyncio.call_soon_threadsafe (see tornadoweb/tornado#2463) and then annotating some uses of add_callback from within the event loop as being safe.

Waiting on that tornado issue to learn if there is a better way forward.

diff --git a/distributed/worker.py b/distributed/worker.py
index 48e4bb8..4a73b7c 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -338,7 +338,7 @@ class WorkerBase(ServerNode):
         self.batched_stream = BatchedSend(interval='2ms', loop=self.loop)
         self.batched_stream.start(comm)
         self.periodic_callbacks['heartbeat'].start()
-        self.loop.add_callback(self.handle_scheduler, comm)
+        self.loop.add_callback(self.handle_scheduler, comm, threadsafe=True)
 
     @gen.coroutine
     def handle_scheduler(self, comm):
@@ -351,7 +351,8 @@ class WorkerBase(ServerNode):
         finally:
             if self.reconnect:
                 logger.info("Connection to scheduler broken.  Reconnecting...")
-                self.loop.add_callback(self._register_with_scheduler)
+                self.loop.add_callback(self._register_with_scheduler,
+                                       threadsafe=True)
             else:
                 yield self._close(report=False)
 
@@ -1421,7 +1422,8 @@ class Worker(WorkerBase):
             if not self.who_has.get(dep):
                 if dep not in self._missing_dep_flight:
                     self._missing_dep_flight.add(dep)
-                    self.loop.add_callback(self.handle_missing_dep, dep)
+                    self.loop.add_callback(self.handle_missing_dep, dep,
+                                           threadsafe=True)
             for key in self.dependents.get(dep, ()):
                 if self.task_state[key] == 'waiting':
                     if remove:  # try a new worker immediately
@@ -1535,7 +1537,7 @@ class Worker(WorkerBase):
                 assert all(dep in self.data for dep in self.dependencies[key])
 
             self.executing.add(key)
-            self.loop.add_callback(self.execute, key)
+            self.loop.add_callback(self.execute, key, threadsafe=True)
         except Exception as e:
             logger.exception(e)
             if LOG_PDB:
@@ -1667,7 +1669,7 @@ class Worker(WorkerBase):
                     for dep in missing_deps2:
                         self._missing_dep_flight.add(dep)
                     self.loop.add_callback(self.handle_missing_dep,
-                                           *missing_deps2)
+                                           *missing_deps2, threadsafe=True)
 
                     deps = [dep for dep in deps if dep not in missing_deps]
 
@@ -1699,7 +1701,8 @@ class Worker(WorkerBase):
                     for d in to_gather:
                         self.transition_dep(d, 'flight', worker=worker)
                     self.loop.add_callback(self.gather_dep, worker, dep,
-                                           to_gather, total_nbytes, cause=key)
+                                           to_gather, total_nbytes, cause=key,
+                                           threadsafe=True)
                     changed = True
 

@mrocklin
Copy link
Member Author

mrocklin commented Aug 6, 2018

I suspect that for the comm overhead a good first step would be to try to implement a comm for asyncio (see #2162) and then use uvloop

@mrocklin
Copy link
Member Author

mrocklin commented Aug 7, 2018

OK, there is a workable uvloop compatible Comm implementation in #2165

The picture doesn't change a whole lot yet. Some conversation in https://stackoverflow.com/questions/51731690/why-does-asyncio-spend-time-in-socket-senddata

However, adding up all of these small things has reduced our worker overhead substantially. I need a new and harder case study :)

@mrocklin
Copy link
Member Author

mrocklin commented Oct 8, 2018

The tornado add_callback issue was resolved. Scheduler and Worker CPU overhead seems much lower these days as a result. This will be out in Tornado 6.

@NotSqrt
Copy link
Contributor

NotSqrt commented Oct 26, 2018

For psutil, it's possible to use https://psutil.readthedocs.io/en/latest/#psutil.Process.oneshot to collect cpu and memory in one round.
The requirement is psutil >= 5.0.

@NotSqrt
Copy link
Contributor

NotSqrt commented Nov 6, 2018

@emaror mentioned ~9% CPU for each worker in #2079 (comment) with python 2.7.

I had similar observations.

I just switched to python 3.6, and the workers are down to 2-4% each.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 6, 2018

@NotSqrt if you're inclined to try using psutil.Process.oneshot to accelerate things here that would be a welcome contribution

@NotSqrt
Copy link
Contributor

NotSqrt commented Nov 6, 2018

Yes, I'm working on it, while also tracking memory and CPU of worker children, when tasks create subprocesses.

@bybyte
Copy link

bybyte commented Jun 19, 2019

With Python 2.7 we also experienced each idle worker process using about 10% CPU. For us, changing the following in distributed/distributed.yaml brought the usage down to about 0.5% (the choice to try and edit these came from debugging the worker):

distributed.admin.tick.interval: 1000ms 
distributed.worker.profile.interval: 1000ms

@jakirkham
Copy link
Member

I suspect that for the comm overhead a good first step would be to try to implement a comm for asyncio (see #2162) and then use uvloop

Just wanted to update this thread to note, both of these things are done, but they require opting in (as oppose to being default). See the following config settings:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants