From 81ef2219fbdba2e1b91efcbe450af43434efc01d Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 10 Aug 2016 19:07:58 -0400 Subject: [PATCH] Cleanup (#412) * Remove s3fs code This has been migrated to the dask repository * remove Status_Monitor object * improve test coverage * validate keys after transitions * fix bytestring test * s3fs squash * tweak executor solo test * except socket.error in Executor() --- .coveragerc | 4 + distributed/bokeh/status_monitor.py | 64 ----- distributed/core.py | 4 +- distributed/deploy/tests/test_local.py | 10 +- distributed/diagnostics/progress.py | 30 +- distributed/diagnostics/scheduler.py | 70 ----- .../diagnostics/tests/test_progressbar.py | 3 + distributed/executor.py | 26 +- distributed/s3.py | 55 ---- distributed/scheduler.py | 30 +- distributed/tests/test_core.py | 46 +++- distributed/tests/test_executor.py | 85 ++++-- distributed/tests/test_s3.py | 257 ------------------ distributed/tests/test_scheduler.py | 23 ++ distributed/tests/test_utils.py | 20 +- distributed/utils.py | 4 +- 16 files changed, 199 insertions(+), 532 deletions(-) delete mode 100644 distributed/s3.py delete mode 100644 distributed/tests/test_s3.py diff --git a/.coveragerc b/.coveragerc index 097089338df..8038e8d8dab 100644 --- a/.coveragerc +++ b/.coveragerc @@ -7,6 +7,10 @@ omit = distributed/cluster.py distributed/*/tests/test* distributed/compatibility.py + distributed/cli/utils.py + distributed/utils_test.py + distributed/deploy/ssh.py + distributed/_ipython_utils.py [report] show_missing = True diff --git a/distributed/bokeh/status_monitor.py b/distributed/bokeh/status_monitor.py index 62c71909f47..98ff8fa85d2 100644 --- a/distributed/bokeh/status_monitor.py +++ b/distributed/bokeh/status_monitor.py @@ -21,70 +21,6 @@ except ImportError: Spectral11 = None -class Status_Monitor(object): - """ Display the tasks running and waiting on each worker - - Parameters - ---------- - addr: tuple, optional - (ip, port) of scheduler. Defaults to scheduler of recent Executor - interval: Number, optional - Interval between updates. Defaults to 1s - """ - def __init__(self, addr=None, interval=1000.00, loop=None): - if addr is None: - scheduler = default_executor().scheduler - if isinstance(scheduler, rpc): - addr = (scheduler.ip, 9786) - elif isinstance(scheduler, Scheduler): - addr = ('127.0.0.1', scheduler.services['http'].port) - self.addr = addr - self.interval = interval - - self.display_notebook = False - - if is_kernel() and not curstate().notebook: - output_notebook() - assert curstate().notebook - - self.task_source, self.task_table = task_table_plot() - self.worker_source, self.worker_table = worker_table_plot() - - self.output = vplot(self.worker_table, self.task_table) - - self.client = AsyncHTTPClient() - - self.loop = loop or IOLoop.current() - self.loop.add_callback(self.update) - self._pc = PeriodicCallback(self.update, self.interval, io_loop=self.loop) - self._pc.start() - - def _ipython_display_(self, **kwargs): - show(self.output) - self.display_notebook = True - - @gen.coroutine - def update(self): - """ Query the Scheduler, update the figure - - This opens a connection to the scheduler, sends it a function to run - periodically, streams the results back and uses those results to update - the bokeh figure - """ - with log_errors(): - tasks, workers = yield [ - self.client.fetch('http://%s:%d/tasks.json' % self.addr), - self.client.fetch('http://%s:%d/workers.json' % self.addr)] - - tasks = json.loads(tasks.body.decode()) - workers = json.loads(workers.body.decode()) - - task_table_update(self.task_source, tasks) - worker_table_update(self.worker_source, workers) - - if self.display_notebook: - push_notebook() - def task_table_plot(row_headers=False, width=600, height=400): names = ['waiting', 'ready', 'failed', 'processing', 'in-memory', 'total'] diff --git a/distributed/core.py b/distributed/core.py index 44160ed1dad..3e4fb29c187 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -387,8 +387,6 @@ def __init__(self, arg=None, stream=None, ip=None, port=None, addr=None, timeout if PY3 and isinstance(ip, bytes): ip = ip.decode() self.streams = dict() - if stream: - self.streams[stream] = True self.ip = ip self.port = port self.timeout = timeout @@ -469,7 +467,7 @@ def coerce_to_address(o, out=str): o = (o[0].decode(), o[1]) if out == str: - o = '%s:%d' % o + o = '%s:%s' % o return o diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 9c0a15c0f32..daa68660a34 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -59,8 +59,14 @@ def test_Executor_with_local(loop): def test_Executor_solo(loop): - e = Executor(loop=loop) - e.shutdown() + with Executor(loop=loop) as e: + pass + + +def test_Executor_twice(loop): + with Executor(loop=loop) as e: + with Executor(loop=loop) as f: + assert e.cluster.scheduler.port != f.cluster.scheduler.port def test_defaults(): diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index e2b84e6b35c..0db261a0465 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -107,18 +107,6 @@ def setup(self): for k in errors: self.transition(k, None, 'erred', exception=True) - def start(self): - self.status = 'running' - logger.debug("Start Progress Plugin") - self._start() - if any(k in self.scheduler.exceptions_blame for k in self.all_keys): - self.stop(True) - elif not self.keys: - self.stop() - - def _start(self): - pass - def transition(self, key, start, finish, *args, **kwargs): if key in self.keys and start == 'processing' and finish == 'memory': logger.debug("Progress sees key %s", key) @@ -147,10 +135,6 @@ def stop(self, exception=None, key=None): self.status = 'finished' logger.debug("Remove Progress plugin") - @property - def elapsed(self): - return default_timer() - self._start_time - class MultiProgress(Progress): """ Progress variant that keeps track of different groups of keys @@ -239,18 +223,6 @@ def transition(self, key, start, finish, *args, **kwargs): logger.debug("A task was cancelled (%s), stopping progress", key) self.stop(exception=True) - def start(self): - self.status = 'running' - logger.debug("Start Progress Plugin") - self._start() - if not self.keys or not any(v for v in self.keys.values()): - self.stop() - elif all(k in self.scheduler.exceptions_blame for k in - concat(self.keys.values())): - key = next(k for k in concat(self.keys.values()) if k in - self.scheduler.exceptions_blame) - self.stop(exception=True, key=key) - def format_time(t): """Format seconds into a human readable form. @@ -259,6 +231,8 @@ def format_time(t): '10.4s' >>> format_time(1000.4) '16min 40.4s' + >>> format_time(100000.4) + '27hr 46min 40.4s' """ m, s = divmod(t, 60) h, m = divmod(m, 60) diff --git a/distributed/diagnostics/scheduler.py b/distributed/diagnostics/scheduler.py index b8a98f0eaa5..c2fefea25ac 100644 --- a/distributed/diagnostics/scheduler.py +++ b/distributed/diagnostics/scheduler.py @@ -60,73 +60,3 @@ def workers(s): info['last-seen'] = (now - info['last-seen']) return result - - -def scheduler_progress_df(d): - """ Convert status response to DataFrame of total progress - - Consumes dictionary from status.json route - - Examples - -------- - >>> d = {"ready": 5, "in-memory": 30, "waiting": 20, - ... "tasks": 70, "failed": 9, - ... "processing": 6, - ... "other-keys-are-fine-too": ''} - - >>> scheduler_progress_df(d) # doctest: +SKIP - Count Progress - Tasks - waiting 20 +++++++++++ - ready 5 ++ - failed 9 +++++ - processing 6 +++ - in-memory 30 +++++++++++++++++ - total 70 ++++++++++++++++++++++++++++++++++++++++ - """ - import pandas as pd - d = d.copy() - d['total'] = d.pop('tasks') - names = ['waiting', 'ready', 'failed', 'processing', 'in-memory', 'total'] - df = pd.DataFrame(pd.Series({k: d[k] for k in names}, - index=names, name='Count')) - if d['total']: - barlength = (40 * df.Count / d['total']).astype(int) - df['Progress'] = barlength.apply(lambda n: ('%-40s' % (n * '+').rstrip(' '))) - else: - df['Progress'] = 0 - - df.index.name = 'Tasks' - - return df - - -def worker_status_df(d): - """ Status of workers as a Pandas DataFrame - - Consumes data from status.json route. - - Examples - -------- - >>> d = {"other-keys-are-fine-too": '', - ... "ncores": {"192.168.1.107": 4, - ... "192.168.1.108": 4}, - ... "processing": {"192.168.1.108": {'inc': 3, 'add': 1}, - ... "192.168.1.107": {'inc': 2}}, - ... "bytes": {"192.168.1.108": 1000, - ... "192.168.1.107": 2000}} - - >>> worker_status_df(d) - Ncores Bytes Processing - Workers - 192.168.1.107 4 2000 [inc] - 192.168.1.108 4 1000 [add, inc] - """ - import pandas as pd - names = ['ncores', 'bytes', 'processing'] - df = pd.DataFrame({k: d[k] for k in names}, columns=names) - df['processing'] = df['processing'].apply(sorted) - df.columns = df.columns.map(str.title) - df.index.name = 'Workers' - df = df.sort_index() - return df diff --git a/distributed/diagnostics/tests/test_progressbar.py b/distributed/diagnostics/tests/test_progressbar.py index 23bb018e84c..33f11872476 100644 --- a/distributed/diagnostics/tests/test_progressbar.py +++ b/distributed/diagnostics/tests/test_progressbar.py @@ -87,3 +87,6 @@ def test_progress_function(loop, capsys): progress([[f], [[g]]], notebook=False) check_bar_completed(capsys) + + progress(f) + check_bar_completed(capsys) diff --git a/distributed/executor.py b/distributed/executor.py index 4520ecf36f0..cb61ef97058 100644 --- a/distributed/executor.py +++ b/distributed/executor.py @@ -11,6 +11,7 @@ import uuid from threading import Thread import six +import socket import dask from dask.base import tokenize, normalize_token, Base @@ -315,7 +316,7 @@ def _start(self, timeout=3, **kwargs): from distributed.deploy import LocalCluster try: self.cluster = LocalCluster(loop=self.loop, start=False) - except OSError: + except (OSError, socket.error): self.cluster = LocalCluster(scheduler_port=0, loop=self.loop, start=False) self._start_arg = self.cluster.scheduler_address @@ -325,16 +326,15 @@ def _start(self, timeout=3, **kwargs): ident = yield r.identity() except (StreamClosedError, OSError): raise IOError("Could not connect to %s:%d" % (r.ip, r.port)) - if ident['type'] == 'Scheduler': - self.scheduler = r - stream = yield connect(r.ip, r.port) - yield write(stream, {'op': 'register-client', - 'client': self.id}) - bstream = BatchedSend(interval=10, loop=self.loop) - bstream.start(stream) - self.scheduler_stream = bstream - else: - raise ValueError("Unknown Type") + assert ident['type'] == 'Scheduler' + + self.scheduler = r + stream = yield connect(r.ip, r.port) + yield write(stream, {'op': 'register-client', + 'client': self.id}) + bstream = BatchedSend(interval=10, loop=self.loop) + bstream.start(stream) + self.scheduler_stream = bstream start_event = Event() self.coroutines.append(self._handle_report(start_event)) @@ -1360,7 +1360,7 @@ def ncores(self, workers=None): workers = list(workers) if workers is not None and not isinstance(workers, (list, set)): workers = [workers] - return sync(self.loop, self.scheduler.ncores, addresses=workers) + return sync(self.loop, self.scheduler.ncores, workers=workers) def who_has(self, futures=None): """ The workers storing each future's data @@ -1422,7 +1422,7 @@ def has_what(self, workers=None): workers = list(workers) if workers is not None and not isinstance(workers, (list, set)): workers = [workers] - return sync(self.loop, self.scheduler.has_what, keys=workers) + return sync(self.loop, self.scheduler.has_what, workers=workers) def stacks(self, workers=None): """ The task queues on each worker diff --git a/distributed/s3.py b/distributed/s3.py deleted file mode 100644 index 4e0eb6b1b15..00000000000 --- a/distributed/s3.py +++ /dev/null @@ -1,55 +0,0 @@ -from __future__ import print_function, division, absolute_import - -import logging -import io -from warnings import warn - -from dask import bytes as dbytes -from s3fs import S3FileSystem - -from .executor import default_executor, ensure_default_get - - -logger = logging.getLogger(__name__) - - -def read_text(fn, keyname=None, encoding='utf-8', errors='strict', - lineterminator='\n', executor=None, fs=None, lazy=True, - collection=True, blocksize=2**27, compression=None, anon=None, **kwargs): - warn("distributed.s3.read_text(...) Moved to " - "dask.bag.read_text('s3://...')") - if keyname is not None: - if not keyname.startswith('/'): - keyname = '/' + keyname - fn = fn + keyname - import dask.bag as db - result = db.read_text('s3://' + fn, encoding=encoding, errors=errors, - linedelimiter=lineterminator, collection=collection, - blocksize=blocksize, compression=compression, - storage_options={'s3': fs, 'anon': anon}, **kwargs) - executor = default_executor(executor) - ensure_default_get(executor) - if not lazy: - if collection: - result = executor.persist(result) - else: - result = executor.compute(result) - return result - - -def read_csv(path, executor=None, fs=None, lazy=True, collection=True, - lineterminator='\n', blocksize=2**27, storage_options=None, **kwargs): - warn("distributed.s3.read_csv(...) Moved to " - "dask.dataframe.read_csv('s3://...')") - import dask.dataframe as dd - result = dd.read_csv('s3://' + path, collection=collection, - lineterminator=lineterminator, blocksize=blocksize, - storage_options=storage_options, **kwargs) - executor = default_executor(executor) - ensure_default_get(executor) - if not lazy: - if collection: - result = executor.persist(result) - else: - result = executor.compute(result) - return result diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e5cdb8aa220..e9187114229 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -812,6 +812,8 @@ def validate_key(self, key): try: func = getattr(self, 'validate_' + self.task_state[key]) except KeyError: + logger.debug("Key lost: %s", key) + except AttributeError: logger.info("self.validate_%s not found", self.task_state[key]) else: func(key) @@ -1462,17 +1464,17 @@ def get_who_has(self, stream=None, keys=None): else: return valmap(list, self.who_has) - def get_has_what(self, stream=None, keys=None): - if keys is not None: - keys = map(self.coerce_address, keys) - return {k: list(self.has_what.get(k, ())) for k in keys} + def get_has_what(self, stream=None, workers=None): + if workers is not None: + workers = map(self.coerce_address, workers) + return {w: list(self.has_what.get(w, ())) for w in workers} else: return valmap(list, self.has_what) - def get_ncores(self, stream=None, addresses=None): - if addresses is not None: - addresses = map(self.coerce_address, addresses) - return {k: self.ncores.get(k, None) for k in addresses} + def get_ncores(self, stream=None, workers=None): + if workers is not None: + workers = map(self.coerce_address, workers) + return {w: self.ncores.get(w, None) for w in workers} else: return self.ncores @@ -2086,12 +2088,18 @@ def transitions(self, recommendations): This includes feedback from previous transitions and continues until we reach a steady state """ + keys = set() recommendations = recommendations.copy() while recommendations: key, finish = recommendations.popitem() + keys.add(key) new = self.transition(key, finish) recommendations.update(new) + if self.validate: + for key in keys: + self.validate_key(key) + def transition_story(self, *keys): """ Get all transitions that touch one of the input keys """ keys = set(keys) @@ -2399,12 +2407,6 @@ def workers_list(self, workers): out.update({ww for ww in self.ncores if w in ww}) # TODO: quadratic return list(out) - def log_state(self, msg=''): - """ Log current full state of the scheduler """ - logger.debug("Runtime State: %s", msg) - logger.debug('\n\nwaiting: %s\n\nstacks: %s\n\nprocessing: %s\n\n', - self.waiting, self.stacks, self.processing) - def decide_worker(dependencies, stacks, processing, who_has, has_what, restrictions, loose_restrictions, nbytes, key): diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 120b6d047d2..c242cb06c61 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -5,17 +5,21 @@ import socket from tornado import gen, ioloop +from tornado.iostream import StreamClosedError import pytest from distributed.core import (read, write, pingpong, Server, rpc, connect, - coerce_to_rpc) + coerce_to_rpc, send_recv, coerce_to_address) from distributed.utils_test import slow, loop, gen_test def test_server(loop): @gen.coroutine def f(): server = Server({'ping': pingpong}) + with pytest.raises(OSError): + server.port server.listen(8887) + assert server.port == 8887 stream = yield connect('127.0.0.1', 8887) @@ -87,12 +91,12 @@ def f(): loop.run_sync(f) +def echo(stream, x): + return x + @slow def test_large_packets(loop): """ tornado has a 100MB cap by default """ - def echo(stream, x): - return x - @gen.coroutine def f(): server = Server({'echo': echo}) @@ -181,3 +185,37 @@ def test_errors(): yield r.div(x=1, y=0) r.close_streams() + + +@gen_test() +def test_connect_raises(): + with pytest.raises((gen.TimeoutError, StreamClosedError)): + yield connect('127.0.0.1', 58259, timeout=0.01) + + +@gen_test() +def test_send_recv_args(): + server = Server({'echo': echo}) + server.listen(0) + + result = yield send_recv(arg=('127.0.0.1', server.port), op='echo', x=b'1') + assert result == b'1' + result = yield send_recv(addr=('127.0.0.1:%d' % server.port).encode(), + op='echo', x=b'1') + assert result == b'1' + result = yield send_recv(ip=b'127.0.0.1', port=server.port, op='echo', + x=b'1') + assert result == b'1' + result = yield send_recv(ip=b'127.0.0.1', port=server.port, op='echo', + x=b'1', reply=False) + assert result == None + + server.stop() + + +def test_coerce_to_address(): + for arg in [b'127.0.0.1:8786', + '127.0.0.1:8786', + ('127.0.0.1', 8786), + ('127.0.0.1', '8786')]: + assert coerce_to_address(arg) == '127.0.0.1:8786' diff --git a/distributed/tests/test_executor.py b/distributed/tests/test_executor.py index 68ba12e942a..c87670a17cd 100644 --- a/distributed/tests/test_executor.py +++ b/distributed/tests/test_executor.py @@ -19,6 +19,7 @@ from tornado import gen from tornado.ioloop import IOLoop +import dask from dask import delayed from dask.context import _globals from distributed import Worker, Nanny @@ -2260,6 +2261,7 @@ def test_futures_of(e, s, a, b): assert set(futures_of(x)) == {x} assert set(futures_of([x, y, z])) == {x, y, z} assert set(futures_of([x, [y], [[z]]])) == {x, y, z} + assert set(futures_of({'x': x, 'y': [y]})) == {x, y} import dask.bag as db b = db.Bag({('b', i): f for i, f in enumerate([x, y, z])}, 'b', 3) @@ -3070,6 +3072,12 @@ def test_default_get(loop): with Executor(('127.0.0.1', s['port']), loop=loop) as e: assert _globals['get'] == e.get + with Executor(('127.0.0.1', s['port']), loop=loop, set_as_default=False) as e: + assert _globals['get'] != e.get + dask.set_options(get=e.get) + assert _globals['get'] == e.get + assert _globals['get'] != e.get + @gen_cluster(executor=True) def test_get_stacks_processing(e, s, a, b): @@ -3084,11 +3092,46 @@ def test_get_stacks_processing(e, s, a, b): yield gen.sleep(0.2) - stacks = yield e.scheduler.stacks() - assert stacks == valmap(list, s.stacks) + c = yield e.scheduler.stacks() + assert c == valmap(list, s.stacks) - processing = yield e.scheduler.processing() - assert processing == valmap(list, s.processing) + c = yield e.scheduler.stacks(workers=[a.address]) + assert c == {a.address: list(s.stacks[a.address])} + + c = yield e.scheduler.processing() + assert c == valmap(list, s.processing) + + c = yield e.scheduler.processing(workers=[a.address]) + assert c == {a.address: list(s.processing[a.address])} + +@gen_cluster(executor=True) +def test_get_foo(e, s, a, b): + futures = e.map(inc, range(10)) + yield _wait(futures) + + c = yield e.scheduler.ncores() + assert c == s.ncores + + c = yield e.scheduler.ncores(workers=[a.address]) + assert c == {a.address: s.ncores[a.address]} + + c = yield e.scheduler.has_what() + assert c == valmap(list, s.has_what) + + c = yield e.scheduler.has_what(workers=[a.address]) + assert c == {a.address: list(s.has_what[a.address])} + + c = yield e.scheduler.nbytes(summary=False) + assert c == s.nbytes + + c = yield e.scheduler.nbytes(keys=[futures[0].key], summary=False) + assert c == {futures[0].key: s.nbytes[futures[0].key]} + + c = yield e.scheduler.who_has() + assert c == valmap(list, s.who_has) + + c = yield e.scheduler.who_has(keys=[futures[0].key]) + assert c == {futures[0].key: list(s.who_has[futures[0].key])} @gen_cluster(executor=True, Worker=Nanny) @@ -3334,22 +3377,30 @@ def test_status(): @gen_cluster(executor=True) -def test_compute_optimize_graph(e, s, a, b): +def test_persist_optimize_graph(e, s, a, b): + i = 10 import dask.bag as db - b = db.range(10, npartitions=2) - b2 = b.map(inc) - b3 = b2.map(inc) + for method in [e.persist, e.compute]: + b = db.range(i, npartitions=2); i += 1 + b2 = b.map(inc) + b3 = b2.map(inc) + + b4 = method(b3, optimize_graph=False) + yield _wait(b4) - b4 = e.persist(b3, optimize_graph=False) - yield _wait(b4) + assert set(map(tokey, b3._keys())).issubset(s.tasks) - assert set(map(tokey, b3._keys())).issubset(s.tasks) + b = db.range(i, npartitions=2); i += 1 + b2 = b.map(inc) + b3 = b2.map(inc) - b = db.range(12, npartitions=2) - b2 = b.map(inc) - b3 = b2.map(inc) + b4 = method(b3, optimize_graph=True) + yield _wait(b4) - b4 = e.persist(b3, optimize_graph=True) - yield _wait(b4) + assert not any(tokey(k) in s.tasks for k in b2._keys()) - assert not any(tokey(k) in s.tasks for k in b2._keys()) + +@gen_cluster(executor=True, ncores=[]) +def test_persist_optimize_graph(e, s): + with pytest.raises(ValueError): + yield e._scatter([1]) diff --git a/distributed/tests/test_s3.py b/distributed/tests/test_s3.py deleted file mode 100644 index 1caf11790a7..00000000000 --- a/distributed/tests/test_s3.py +++ /dev/null @@ -1,257 +0,0 @@ -from __future__ import print_function, division, absolute_import - -import pytest -pytest.importorskip('s3fs') - -import io -import json -from math import ceil - -import boto3 -from tornado import gen - -from dask.delayed import Delayed -from distributed import Executor -from distributed.executor import _wait, Future -from distributed.s3 import (read_text, read_csv, - S3FileSystem) -from distributed.utils import get_ip -from distributed.utils_test import gen_cluster, loop, cluster - - -ip = get_ip() - - -# These get mirrored on s3://distributed-test/ -test_bucket_name = 'distributed-test' -files = {'test/accounts.1.json': (b'{"amount": 100, "name": "Alice"}\n' - b'{"amount": 200, "name": "Bob"}\n' - b'{"amount": 300, "name": "Charlie"}\n' - b'{"amount": 400, "name": "Dennis"}\n'), - 'test/accounts.2.json': (b'{"amount": 500, "name": "Alice"}\n' - b'{"amount": 600, "name": "Bob"}\n' - b'{"amount": 700, "name": "Charlie"}\n' - b'{"amount": 800, "name": "Dennis"}\n')} - -csv_files = {'2014-01-01.csv': (b'name,amount,id\n' - b'Alice,100,1\n' - b'Bob,200,2\n' - b'Charlie,300,3\n'), - '2014-01-02.csv': (b'name,amount,id\n'), - '2014-01-03.csv': (b'name,amount,id\n' - b'Dennis,400,4\n' - b'Edith,500,5\n' - b'Frank,600,6\n')} - - -@pytest.yield_fixture -def s3(): - # anonymous S3 access to real remote data - yield S3FileSystem(anon=True) - - -@gen_cluster(timeout=60, executor=True) -def test_read_text(e, s, a, b): - import dask.bag as db - - b = read_text(test_bucket_name+'/test/accounts*', lazy=True, - collection=True, anon=True) - assert isinstance(b, db.Bag) - yield gen.sleep(0.2) - assert not s.tasks - - future = e.compute(b.map(json.loads).pluck('amount').sum()) - result = yield future._result() - - assert result == (1 + 2 + 3 + 4 + 5 + 6 + 7 + 8) * 100 - - text = read_text(test_bucket_name+'/test/accounts*', lazy=True, - collection=False, anon=True) - assert all(isinstance(v, Delayed) for v in text) - - text = read_text(test_bucket_name+'/test/accounts*', lazy=False, - collection=False, anon=True) - assert all(isinstance(v, Future) for v in text) - - -@gen_cluster(timeout=60, executor=True) -def test_read_text_blocksize(e, s, a, b): - for bs in [20, 27, 12]: - b = read_text(test_bucket_name+'/test/accounts*', lazy=True, - blocksize=bs, collection=True, anon=True) - assert b.npartitions == sum(ceil(len(b) / bs) for b in files.values()) - - -@gen_cluster(timeout=60, executor=True) -def test_read_text_compression(e, s, a, b): - import dask.bag as db - b = db.read_text('s3://distributed-test/csv/gzip/*', compression='gzip', - blocksize=None, storage_options=dict(anon=True)) - result = yield e.compute(b)._result() - assert result == [line + '\n' for k in sorted(csv_files) - for line in csv_files[k].decode().split('\n') - if line] - - -def test_read_text_sync(loop): - import dask.bag as db - with cluster() as (s, [a, b]): - with Executor(('127.0.0.1', s['port']), loop=loop) as e: - b = read_text(test_bucket_name+'/test/accounts*', lazy=True, - collection=True, anon=True) - assert isinstance(b, db.Bag) - c = b.map(json.loads).pluck('amount').sum() - result = c.compute(get=e.get) - - assert result == (1 + 2 + 3 + 4 + 5 + 6 + 7 + 8) * 100 - - -def test_read_text_bucket_key_inputs(loop): - with cluster() as (s, [a, b]): - with Executor(('127.0.0.1', s['port']), loop=loop) as e: - a = read_text(test_bucket_name, '/test/accounts*', lazy=True, - anon=True) - b = read_text(test_bucket_name, 'test/accounts*', lazy=True, - anon=True) - c = read_text(test_bucket_name + '/test/accounts*', lazy=True, - anon=True) - - assert a._keys() == b._keys() == c._keys() - - -def test_pickle(s3): - import pickle - a = pickle.loads(pickle.dumps(s3)) - - assert [a.anon, a.key, a.secret, a.kwargs, a.dirs] == \ - [s3.anon, s3.key, s3.secret, s3.kwargs, s3.dirs] - - assert a.ls('distributed-test/') == s3.ls('distributed-test/') - - -def test_errors(s3): - try: - s3.open('distributed-test/test/accounts.1.json', mode='rt') - except Exception as e: - assert "mode='rb'" in str(e) - try: - s3.open('distributed-test/test/accounts.1.json', mode='r') - except Exception as e: - assert "mode='rb'" in str(e) - - -def test_seek(s3): - fn = 'test/accounts.1.json' - b = io.BytesIO(files[fn]) - with s3.open('/'.join([test_bucket_name, fn])) as f: - assert f.tell() == b.tell() - f.seek(10) - b.seek(10) - assert f.tell() == b.tell() - f.seek(10, 1) - b.seek(10, 1) - assert f.tell() == b.tell() - assert f.read(5) == b.read(5) - assert f.tell() == b.tell() - f.seek(10, 2) - b.seek(10, 2) - assert f.tell() == b.tell() - assert f.read(5) == b.read(5) - assert f.tell() == b.tell() - assert f.read(1000) == b.read(1000) - assert f.tell() == b.tell() - - -def test_repr(s3): - with s3.open('distributed-test/test/accounts.1.json', mode='rb') as f: - assert 'distributed-test' in repr(f) - assert 'accounts.1.json' in repr(f) - - -def test_read_past_location(s3): - with s3.open('distributed-test/test/accounts.1.json', block_size=20) as f: - while f.read(10): - pass - f.seek(5000) - out = f.read(10) - assert out == b'' - - -@gen_cluster(timeout=60, executor=True) -def test_read_csv(e, s, a, b): - dd = pytest.importorskip('dask.dataframe') - s3 = S3FileSystem(anon=True) - - df = read_csv('distributed-test/csv/2015/*', lazy=True, - storage_options={'anon': True}) - yield gen.sleep(0.1) - assert not s.tasks - assert isinstance(df, dd.DataFrame) - - df = read_csv('distributed-test/csv/2015/*', storage_options={'anon': True}) - assert isinstance(df, dd.DataFrame) - assert list(df.columns) == ['name', 'amount', 'id'] - - f = e.compute(df.amount.sum()) - result = yield f._result() - assert result == (100 + 200 + 300 + 400 + 500 + 600) - - futures = read_csv('distributed-test/csv/2015/*', - collection=False, lazy=False, - storage_options={'anon': True}) - assert len(futures) == 3 - assert all(isinstance(f, Future) for f in futures) - results = yield e._gather(futures) - assert results[0].id.sum() == 1 + 2 + 3 - assert results[1].id.sum() == 0 - assert results[2].id.sum() == 4 + 5 + 6 - - values = read_csv('distributed-test/csv/2015/*', - collection=False, lazy=True, - storage_options={'anon': True}) - assert len(values) == 3 - assert all(isinstance(v, Delayed) for v in values) - - df2 = read_csv('distributed-test/csv/2015/*', - collection=True, lazy=True, blocksize=20, - storage_options={'anon': True}) - assert df2.npartitions > df.npartitions - result = yield e.compute(df2.id.sum())._result() - assert result == 1 + 2 + 3 + 4 + 5 + 6 - - df2 = read_csv('distributed-test/csv/2015/*', - collection=True, lazy=False, blocksize=20, - storage_options={'anon': True}) - f = e.compute(df2.amount.sum()) - result = yield f._result() - assert result == (100 + 200 + 300 + 400 + 500 + 600) - - -@gen_cluster(timeout=60, executor=True) -def test_read_csv_gzip(e, s, a, b): - dd = pytest.importorskip('dask.dataframe') - s3 = S3FileSystem(anon=True) - - df = read_csv('distributed-test/csv/gzip/*', compression='gzip', - storage_options={'anon': True}) - assert isinstance(df, dd.DataFrame) - assert list(df.columns) == ['name', 'amount', 'id'] - f = e.compute(df.amount.sum()) - result = yield f._result() - assert result == (100 + 200 + 300 + 400 + 500 + 600) - - -def test_read_csv_sync(loop): - dd = pytest.importorskip('dask.dataframe') - with cluster() as (s, [a, b]): - with Executor(('127.0.0.1', s['port']), loop=loop) as e: - df = read_csv('distributed-test/csv/2015/*', lazy=True, - storage_options={'anon': True}) - assert isinstance(df, dd.DataFrame) - assert list(df.columns) == ['name', 'amount', 'id'] - f = e.compute(df.amount.sum()) - assert f.result() == (100 + 200 + 300 + 400 + 500 + 600) - - df = read_csv('distributed-test/csv/2015/*', lazy=False, - storage_options={'anon': True}) - assert df.amount.sum().compute() == (100 + 200 + 300 + 400 + 500 + 600) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f1122861fd7..1f65ab9f0e7 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -36,6 +36,12 @@ bob = 'bob:1234' +@gen_cluster() +def test_administration(s, a, b): + assert isinstance(s.address, str) + assert s.address_tuple[0] in s.address + + @gen_cluster() def test_ready_add_worker(s, a, b): s.add_client(client='client') @@ -937,3 +943,20 @@ def test_add_worker_is_idempotent(s): def test_io_loop(loop): s = Scheduler(loop=loop, validate=True) assert s.io_loop is loop + + +@gen_cluster(executor=True) +def test_transition_story(e, s, a, b): + x = delayed(inc)(1) + y = delayed(inc)(x) + f = e.persist(y) + yield _wait([f]) + + assert s.transition_log + + story = s.transition_story(x.key) + assert all(line in s.transition_log for line in story) + assert len(story) < len(s.transition_log) + assert all(x.key == line[0] or x.key in line[-1] for line in story) + + assert len(s.transition_story(x.key, y.key)) > len(story) diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 5c74d03f764..726bccf2f42 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -1,20 +1,22 @@ from __future__ import print_function, division, absolute_import from collections import Iterator -import pytest +from functools import partial import io from time import time, sleep from threading import Thread import threading +import traceback + +import pytest from tornado import gen from tornado.ioloop import IOLoop from tornado.locks import Event -import traceback import dask from distributed.utils import (All, sync, is_kernel, ensure_ip, str_graph, truncate_exception, get_traceback, queue_to_iterator, - iterator_to_queue, _maybe_complex, read_block, seek_delimiter) + iterator_to_queue, _maybe_complex, read_block, seek_delimiter, funcname) from distributed.utils_test import loop, inc, throws, div from distributed.compatibility import Queue, isqueue @@ -129,7 +131,10 @@ def test_is_kernel(): def test_ensure_ip(): assert ensure_ip('localhost') == '127.0.0.1' + assert ensure_ip('localhost:8787') == '127.0.0.1:8787' + assert ensure_ip(b'localhost:8787') == '127.0.0.1:8787' assert ensure_ip('123.123.123.123') == '123.123.123.123' + assert ensure_ip('123.123.123.123:8787') == '123.123.123.123:8787' def test_truncate_exception(): @@ -257,3 +262,12 @@ def test_seek_delimiter_endline(): f.seek(5) seek_delimiter(f, b'\n', 5) assert f.tell() == 7 + + +def test_funcname(): + def f(): + pass + + assert funcname(f) == 'f' + assert funcname(partial(f)) == 'f' + assert funcname(partial(partial(f))) == 'f' diff --git a/distributed/utils.py b/distributed/utils.py index 479513fd4fd..5e84188f928 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -226,11 +226,11 @@ def ensure_ip(hostname): >>> ensure_ip('localhost:5000') '127.0.0.1:5000' """ + if PY3 and isinstance(hostname, bytes): + hostname = hostname.decode() if ':' in hostname: host, port = hostname.rsplit(':', 1) return ':'.join([ensure_ip(host), port]) - if PY3 and isinstance(hostname, bytes): - hostname = hostname.decode() if re.match('\d+\.\d+\.\d+\.\d+', hostname): # is IP return hostname else: