Skip to content

Commit

Permalink
Add local helper (open-mmlab#42)
Browse files Browse the repository at this point in the history
* add local helper
  • Loading branch information
Hang Zhang authored Jul 1, 2019
1 parent 9c205ac commit 37d798f
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
1 change: 0 additions & 1 deletion autogluon/distributed/dist_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def _run_dist_task(fn, args, gpu_ids):
if 'reporter' in args:
cp = Communicator.Create(p, local_reporter, dist_reporter)
p.join()
#fn(**args)
except Exception as e:
logger.error('Exception in worker process: {}'.format(e))

Expand Down
77 changes: 77 additions & 0 deletions autogluon/distributed/local_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import time
import logging
import threading
import subprocess
#try:
# import queue
#except ImportError:
# import Queue as queue
import multiprocessing as queue

from .ssh_helper import bcolors

class AsyncLineReader(threading.Thread):
def __init__(self, fd, outputQueue, name, err):
threading.Thread.__init__(self)
assert callable(fd.readline)
self.fd = fd
self.outputQueue = outputQueue
self.name = name
self.err = err

def run(self):
while True:
line = self.fd.readline()
if len(line) > 0:
line = line.rstrip().decode("utf-8")
if self.err:
msg = "[\033[1m {name} local\033[0m ] : {output}". \
format(name=self.name, output=line)
else:
msg = "[ {name} local] : {output}". \
format(name=self.name, output=line)
print(msg)
self.outputQueue.put(msg)
else:
time.sleep(0.1)

def eof(self):
return not self.is_alive() and self.outputQueue.empty()

@classmethod
def getForFd(cls, fd, name, err=False, start=True):
stdqueue = queue.Queue()
reader = cls(fd, stdqueue, name, err)
if start:
reader.start()
return reader, stdqueue

def start_local_worker(master_ip, port):
process = subprocess.Popen('dask-worker '#'python -m distributed.cli.dask_worker '
'{}:{} --no-nanny'.format(master_ip, port),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(stdoutReader, stdoutQueue) = AsyncLineReader.getForFd(process.stdout, name='worker')
(stderrReader, stderrQueue) = AsyncLineReader.getForFd(process.stderr, name='worker', err=True)
worker = {'Process': process,
'stdoutReader': stdoutReader,
'stdout_queue': stdoutQueue,
'stderrReader': stderrReader,
'stderr_queue': stderrQueue}
return worker


def start_local_scheduler(port):
process = subprocess.Popen('dask-scheduler ' #'python -m distributed.cli.dask_scheduler '
'--port {}'.format(port),
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(stdoutReader, stdoutQueue) = AsyncLineReader.getForFd(process.stdout, name='scheduler')
(stderrReader, stderrQueue) = AsyncLineReader.getForFd(process.stderr, name='scheduler', err=True)
scheduler = {'Process': process,
'stdoutReader': stdoutReader,
'stdout_queue': stdoutQueue,
'stderrReader': stderrReader,
'stderr_queue': stderrQueue}
return scheduler

0 comments on commit 37d798f

Please sign in to comment.