Skip to content

Commit

Permalink
Cleanup (#412)
Browse files Browse the repository at this point in the history
* Remove s3fs code

This has been migrated to the dask repository

* remove Status_Monitor object

* improve test coverage

* validate keys after transitions

* fix bytestring test

* s3fs squash

* tweak executor solo test

* except socket.error in Executor()
  • Loading branch information
mrocklin authored Aug 10, 2016
1 parent 8f00c0b commit 81ef221
Show file tree
Hide file tree
Showing 16 changed files with 199 additions and 532 deletions.
4 changes: 4 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ omit =
distributed/cluster.py
distributed/*/tests/test*
distributed/compatibility.py
distributed/cli/utils.py
distributed/utils_test.py
distributed/deploy/ssh.py
distributed/_ipython_utils.py

[report]
show_missing = True
Expand Down
64 changes: 0 additions & 64 deletions distributed/bokeh/status_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,70 +21,6 @@
except ImportError:
Spectral11 = None

class Status_Monitor(object):
""" Display the tasks running and waiting on each worker
Parameters
----------
addr: tuple, optional
(ip, port) of scheduler. Defaults to scheduler of recent Executor
interval: Number, optional
Interval between updates. Defaults to 1s
"""
def __init__(self, addr=None, interval=1000.00, loop=None):
if addr is None:
scheduler = default_executor().scheduler
if isinstance(scheduler, rpc):
addr = (scheduler.ip, 9786)
elif isinstance(scheduler, Scheduler):
addr = ('127.0.0.1', scheduler.services['http'].port)
self.addr = addr
self.interval = interval

self.display_notebook = False

if is_kernel() and not curstate().notebook:
output_notebook()
assert curstate().notebook

self.task_source, self.task_table = task_table_plot()
self.worker_source, self.worker_table = worker_table_plot()

self.output = vplot(self.worker_table, self.task_table)

self.client = AsyncHTTPClient()

self.loop = loop or IOLoop.current()
self.loop.add_callback(self.update)
self._pc = PeriodicCallback(self.update, self.interval, io_loop=self.loop)
self._pc.start()

def _ipython_display_(self, **kwargs):
show(self.output)
self.display_notebook = True

@gen.coroutine
def update(self):
""" Query the Scheduler, update the figure
This opens a connection to the scheduler, sends it a function to run
periodically, streams the results back and uses those results to update
the bokeh figure
"""
with log_errors():
tasks, workers = yield [
self.client.fetch('http://%s:%d/tasks.json' % self.addr),
self.client.fetch('http://%s:%d/workers.json' % self.addr)]

tasks = json.loads(tasks.body.decode())
workers = json.loads(workers.body.decode())

task_table_update(self.task_source, tasks)
worker_table_update(self.worker_source, workers)

if self.display_notebook:
push_notebook()


def task_table_plot(row_headers=False, width=600, height=400):
names = ['waiting', 'ready', 'failed', 'processing', 'in-memory', 'total']
Expand Down
4 changes: 1 addition & 3 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,6 @@ def __init__(self, arg=None, stream=None, ip=None, port=None, addr=None, timeout
if PY3 and isinstance(ip, bytes):
ip = ip.decode()
self.streams = dict()
if stream:
self.streams[stream] = True
self.ip = ip
self.port = port
self.timeout = timeout
Expand Down Expand Up @@ -469,7 +467,7 @@ def coerce_to_address(o, out=str):
o = (o[0].decode(), o[1])

if out == str:
o = '%s:%d' % o
o = '%s:%s' % o

return o

Expand Down
10 changes: 8 additions & 2 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,14 @@ def test_Executor_with_local(loop):


def test_Executor_solo(loop):
e = Executor(loop=loop)
e.shutdown()
with Executor(loop=loop) as e:
pass


def test_Executor_twice(loop):
with Executor(loop=loop) as e:
with Executor(loop=loop) as f:
assert e.cluster.scheduler.port != f.cluster.scheduler.port


def test_defaults():
Expand Down
30 changes: 2 additions & 28 deletions distributed/diagnostics/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,6 @@ def setup(self):
for k in errors:
self.transition(k, None, 'erred', exception=True)

def start(self):
self.status = 'running'
logger.debug("Start Progress Plugin")
self._start()
if any(k in self.scheduler.exceptions_blame for k in self.all_keys):
self.stop(True)
elif not self.keys:
self.stop()

def _start(self):
pass

def transition(self, key, start, finish, *args, **kwargs):
if key in self.keys and start == 'processing' and finish == 'memory':
logger.debug("Progress sees key %s", key)
Expand Down Expand Up @@ -147,10 +135,6 @@ def stop(self, exception=None, key=None):
self.status = 'finished'
logger.debug("Remove Progress plugin")

@property
def elapsed(self):
return default_timer() - self._start_time


class MultiProgress(Progress):
""" Progress variant that keeps track of different groups of keys
Expand Down Expand Up @@ -239,18 +223,6 @@ def transition(self, key, start, finish, *args, **kwargs):
logger.debug("A task was cancelled (%s), stopping progress", key)
self.stop(exception=True)

def start(self):
self.status = 'running'
logger.debug("Start Progress Plugin")
self._start()
if not self.keys or not any(v for v in self.keys.values()):
self.stop()
elif all(k in self.scheduler.exceptions_blame for k in
concat(self.keys.values())):
key = next(k for k in concat(self.keys.values()) if k in
self.scheduler.exceptions_blame)
self.stop(exception=True, key=key)


def format_time(t):
"""Format seconds into a human readable form.
Expand All @@ -259,6 +231,8 @@ def format_time(t):
'10.4s'
>>> format_time(1000.4)
'16min 40.4s'
>>> format_time(100000.4)
'27hr 46min 40.4s'
"""
m, s = divmod(t, 60)
h, m = divmod(m, 60)
Expand Down
70 changes: 0 additions & 70 deletions distributed/diagnostics/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,73 +60,3 @@ def workers(s):
info['last-seen'] = (now - info['last-seen'])

return result


def scheduler_progress_df(d):
""" Convert status response to DataFrame of total progress
Consumes dictionary from status.json route
Examples
--------
>>> d = {"ready": 5, "in-memory": 30, "waiting": 20,
... "tasks": 70, "failed": 9,
... "processing": 6,
... "other-keys-are-fine-too": ''}
>>> scheduler_progress_df(d) # doctest: +SKIP
Count Progress
Tasks
waiting 20 +++++++++++
ready 5 ++
failed 9 +++++
processing 6 +++
in-memory 30 +++++++++++++++++
total 70 ++++++++++++++++++++++++++++++++++++++++
"""
import pandas as pd
d = d.copy()
d['total'] = d.pop('tasks')
names = ['waiting', 'ready', 'failed', 'processing', 'in-memory', 'total']
df = pd.DataFrame(pd.Series({k: d[k] for k in names},
index=names, name='Count'))
if d['total']:
barlength = (40 * df.Count / d['total']).astype(int)
df['Progress'] = barlength.apply(lambda n: ('%-40s' % (n * '+').rstrip(' ')))
else:
df['Progress'] = 0

df.index.name = 'Tasks'

return df


def worker_status_df(d):
""" Status of workers as a Pandas DataFrame
Consumes data from status.json route.
Examples
--------
>>> d = {"other-keys-are-fine-too": '',
... "ncores": {"192.168.1.107": 4,
... "192.168.1.108": 4},
... "processing": {"192.168.1.108": {'inc': 3, 'add': 1},
... "192.168.1.107": {'inc': 2}},
... "bytes": {"192.168.1.108": 1000,
... "192.168.1.107": 2000}}
>>> worker_status_df(d)
Ncores Bytes Processing
Workers
192.168.1.107 4 2000 [inc]
192.168.1.108 4 1000 [add, inc]
"""
import pandas as pd
names = ['ncores', 'bytes', 'processing']
df = pd.DataFrame({k: d[k] for k in names}, columns=names)
df['processing'] = df['processing'].apply(sorted)
df.columns = df.columns.map(str.title)
df.index.name = 'Workers'
df = df.sort_index()
return df
3 changes: 3 additions & 0 deletions distributed/diagnostics/tests/test_progressbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,6 @@ def test_progress_function(loop, capsys):

progress([[f], [[g]]], notebook=False)
check_bar_completed(capsys)

progress(f)
check_bar_completed(capsys)
26 changes: 13 additions & 13 deletions distributed/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import uuid
from threading import Thread
import six
import socket

import dask
from dask.base import tokenize, normalize_token, Base
Expand Down Expand Up @@ -315,7 +316,7 @@ def _start(self, timeout=3, **kwargs):
from distributed.deploy import LocalCluster
try:
self.cluster = LocalCluster(loop=self.loop, start=False)
except OSError:
except (OSError, socket.error):
self.cluster = LocalCluster(scheduler_port=0, loop=self.loop,
start=False)
self._start_arg = self.cluster.scheduler_address
Expand All @@ -325,16 +326,15 @@ def _start(self, timeout=3, **kwargs):
ident = yield r.identity()
except (StreamClosedError, OSError):
raise IOError("Could not connect to %s:%d" % (r.ip, r.port))
if ident['type'] == 'Scheduler':
self.scheduler = r
stream = yield connect(r.ip, r.port)
yield write(stream, {'op': 'register-client',
'client': self.id})
bstream = BatchedSend(interval=10, loop=self.loop)
bstream.start(stream)
self.scheduler_stream = bstream
else:
raise ValueError("Unknown Type")
assert ident['type'] == 'Scheduler'

self.scheduler = r
stream = yield connect(r.ip, r.port)
yield write(stream, {'op': 'register-client',
'client': self.id})
bstream = BatchedSend(interval=10, loop=self.loop)
bstream.start(stream)
self.scheduler_stream = bstream

start_event = Event()
self.coroutines.append(self._handle_report(start_event))
Expand Down Expand Up @@ -1360,7 +1360,7 @@ def ncores(self, workers=None):
workers = list(workers)
if workers is not None and not isinstance(workers, (list, set)):
workers = [workers]
return sync(self.loop, self.scheduler.ncores, addresses=workers)
return sync(self.loop, self.scheduler.ncores, workers=workers)

def who_has(self, futures=None):
""" The workers storing each future's data
Expand Down Expand Up @@ -1422,7 +1422,7 @@ def has_what(self, workers=None):
workers = list(workers)
if workers is not None and not isinstance(workers, (list, set)):
workers = [workers]
return sync(self.loop, self.scheduler.has_what, keys=workers)
return sync(self.loop, self.scheduler.has_what, workers=workers)

def stacks(self, workers=None):
""" The task queues on each worker
Expand Down
55 changes: 0 additions & 55 deletions distributed/s3.py

This file was deleted.

Loading

0 comments on commit 81ef221

Please sign in to comment.