Skip to content

Commit

Permalink
Merge branch 'master' into worker-close-gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin authored Jul 30, 2019
2 parents dfb810c + 051a79e commit 87c3082
Show file tree
Hide file tree
Showing 136 changed files with 257 additions and 902 deletions.
2 changes: 0 additions & 2 deletions distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

from . import config
from dask.config import config
from .actor import Actor, ActorFuture
Expand Down
2 changes: 0 additions & 2 deletions distributed/_ipython_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
after which we can import them instead of having our own definitions.
"""

from __future__ import print_function

import atexit
import os

Expand Down
5 changes: 3 additions & 2 deletions distributed/actor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
from tornado import gen
import functools
import threading
from queue import Queue

from .client import Future, default_client
from .compatibility import get_thread_identity, Queue
from .protocol import to_serialize
from .utils import sync
from .utils_comm import WrappedKey
Expand Down Expand Up @@ -103,7 +104,7 @@ def _asynchronous(self):
if self._client:
return self._client.asynchronous
else:
return get_thread_identity() == self._worker.thread_id
return threading.get_ident() == self._worker.thread_id

def _sync(self, func, *args, **kwargs):
if self._client:
Expand Down
2 changes: 0 additions & 2 deletions distributed/batched.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

from collections import deque
import logging

Expand Down
2 changes: 0 additions & 2 deletions distributed/cfexecutor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

import concurrent.futures as cf
import weakref

Expand Down
6 changes: 3 additions & 3 deletions distributed/cli/dask_remote.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from __future__ import print_function, division, absolute_import

import click
from distributed.cli.utils import check_python_3, install_signal_handlers
from distributed.submit import _remote


@click.command()
@click.option("--host", type=str, default=None, help="IP or hostname of this server")
@click.option("--port", type=int, default=8788, help="Remote Client Port")
@click.option(
"--port", type=int, default=8788, show_default=True, help="Remote Client Port"
)
@click.version_option()
def main(host, port):
_remote(host, port)
Expand Down
8 changes: 3 additions & 5 deletions distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

import atexit
import logging
import gc
Expand Down Expand Up @@ -67,15 +65,15 @@
"--dashboard-address",
type=str,
default=":8787",
show_default=True,
help="Address on which to listen for diagnostics dashboard",
)
@click.option(
"--dashboard/--no-dashboard",
"dashboard",
default=True,
show_default=True,
required=False,
help="Launch the Dashboard",
help="Launch the Dashboard [default: --dashboard]",
)
@click.option(
"--bokeh/--no-bokeh",
Expand All @@ -84,7 +82,7 @@
required=False,
help="Deprecated. See --dashboard/--no-dashboard.",
)
@click.option("--show/--no-show", default=False, help="Show web UI")
@click.option("--show/--no-show", default=False, help="Show web UI [default: --show]")
@click.option(
"--dashboard-prefix", type=str, default=None, help="Prefix for the dashboard app"
)
Expand Down
18 changes: 12 additions & 6 deletions distributed/cli/dask_ssh.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

from distributed.deploy.ssh import SSHCluster
import click

Expand All @@ -22,8 +20,9 @@
@click.option(
"--scheduler-port",
default=8786,
show_default=True,
type=int,
help="Specify scheduler port number. Defaults to port 8786.",
help="Specify scheduler port number.",
)
@click.option(
"--nthreads",
Expand All @@ -38,8 +37,9 @@
@click.option(
"--nprocs",
default=1,
show_default=True,
type=int,
help="Number of worker processes per host. Defaults to one.",
help="Number of worker processes per host.",
)
@click.argument("hostnames", nargs=-1, type=str)
@click.option(
Expand All @@ -55,7 +55,11 @@
help="Username to use when establishing SSH connections.",
)
@click.option(
"--ssh-port", default=22, type=int, help="Port to use for SSH connections."
"--ssh-port",
default=22,
type=int,
show_default=True,
help="Port to use for SSH connections.",
)
@click.option(
"--ssh-private-key",
Expand All @@ -79,6 +83,7 @@
@click.option(
"--memory-limit",
default="auto",
show_default=True,
help="Bytes of memory that the worker can use. "
"This can be an integer (bytes), "
"float (fraction of total system memory), "
Expand All @@ -97,8 +102,9 @@
@click.option(
"--remote-dask-worker",
default="distributed.cli.dask_worker",
show_default=True,
type=str,
help="Worker to run. Defaults to distributed.cli.dask_worker",
help="Worker to run.",
)
@click.pass_context
@click.version_option()
Expand Down
13 changes: 6 additions & 7 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

import atexit
import logging
import multiprocessing
Expand Down Expand Up @@ -72,9 +70,8 @@
"--dashboard/--no-dashboard",
"dashboard",
default=True,
show_default=True,
required=False,
help="Launch the Dashboard",
help="Launch the Dashboard [default: --dashboard]",
)
@click.option(
"--bokeh/--no-bokeh",
Expand Down Expand Up @@ -118,7 +115,8 @@
"--nprocs",
type=int,
default=1,
help="Number of worker processes to launch. Defaults to one.",
show_default=True,
help="Number of worker processes to launch.",
)
@click.option(
"--name",
Expand All @@ -131,6 +129,7 @@
@click.option(
"--memory-limit",
default="auto",
show_default=True,
help="Bytes of memory per process that the worker can use. "
"This can be an integer (bytes), "
"float (fraction of total system memory), "
Expand All @@ -140,12 +139,12 @@
@click.option(
"--reconnect/--no-reconnect",
default=True,
help="Reconnect to scheduler if disconnected",
help="Reconnect to scheduler if disconnected [default: --reconnect]",
)
@click.option(
"--nanny/--no-nanny",
default=True,
help="Start workers in nanny process for management",
help="Start workers in nanny process for management [default: --nanny]",
)
@click.option("--pid-file", type=str, default="", help="File to write the process PID")
@click.option(
Expand Down
2 changes: 0 additions & 2 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

import pytest

pytest.importorskip("requests")
Expand Down
2 changes: 0 additions & 2 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

import pytest
from click.testing import CliRunner

Expand Down
3 changes: 0 additions & 3 deletions distributed/cli/tests/test_tls_cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from __future__ import print_function, division, absolute_import

from time import sleep


from distributed import Client
from distributed.utils_test import (
popen,
Expand Down
2 changes: 0 additions & 2 deletions distributed/cli/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

from tornado import gen
from tornado.ioloop import IOLoop

Expand Down
27 changes: 10 additions & 17 deletions distributed/client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from __future__ import print_function, division, absolute_import

import atexit
from collections import defaultdict
from collections import defaultdict, Iterator
from concurrent.futures import ThreadPoolExecutor, CancelledError
from concurrent.futures._base import DoneAndNotDoneFutures
from contextlib import contextmanager
import copy
from datetime import timedelta
import errno
from functools import partial
import html
import itertools
import json
import logging
Expand All @@ -19,14 +18,15 @@
import threading
import six
import socket
from queue import Queue as pyQueue
import warnings
import weakref

import dask
from dask.base import tokenize, normalize_token, collections_to_dsk
from dask.core import flatten, get_dependencies
from dask.optimization import SubgraphCallable
from dask.compatibility import apply, unicode
from dask.compatibility import apply
from dask.utils import ensure_dict, format_bytes

try:
Expand Down Expand Up @@ -55,13 +55,6 @@
gather_from_workers,
)
from .cfexecutor import ClientExecutor
from .compatibility import (
Queue as pyQueue,
isqueue,
html_escape,
StopAsyncIteration,
Iterator,
)
from .core import connect, rpc, clean_exception, CommClosedError, PooledRPCCall
from .metrics import time
from .node import Node
Expand Down Expand Up @@ -400,7 +393,7 @@ def __repr__(self):
return "<Future: status: %s, key: %s>" % (self.status, self.key)

def _repr_html_(self):
text = "<b>Future: %s</b> " % html_escape(key_split(self.key))
text = "<b>Future: %s</b> " % html.escape(key_split(self.key))
text += (
'<font color="gray">status: </font>'
'<font color="%(color)s">%(status)s</font>, '
Expand All @@ -414,7 +407,7 @@ def _repr_html_(self):
except AttributeError:
typ = str(self.type)
text += '<font color="gray">type: </font>%s, ' % typ
text += '<font color="gray">key: </font>%s' % html_escape(str(self.key))
text += '<font color="gray">key: </font>%s' % html.escape(str(self.key))
return text

def __await__(self):
Expand Down Expand Up @@ -1523,7 +1516,7 @@ def map(
if not callable(func):
raise TypeError("First input to map must be a callable function")

if all(map(isqueue, iterables)) or all(
if all(isinstance(it, pyQueue) for it in iterables) or all(
isinstance(i, Iterator) for i in iterables
):
raise TypeError(
Expand Down Expand Up @@ -1792,7 +1785,7 @@ def gather(self, futures, errors="raise", direct=None, asynchronous=None):
--------
Client.scatter: Send data out to cluster
"""
if isqueue(futures):
if isinstance(futures, pyQueue):
raise TypeError(
"Dask no longer supports gathering over Iterators and Queues. "
"Consider using a normal for loop and Client.submit/gather"
Expand Down Expand Up @@ -1829,7 +1822,7 @@ async def _scatter(
if isinstance(workers, six.string_types + (Number,)):
workers = [workers]
if isinstance(data, dict) and not all(
isinstance(k, (bytes, unicode)) for k in data
isinstance(k, (bytes, str)) for k in data
):
d = await self._scatter(keymap(tokey, data), workers, broadcast)
raise gen.Return({k: d[tokey(k)] for k in data})
Expand Down Expand Up @@ -1998,7 +1991,7 @@ def scatter(
"""
if timeout == no_default:
timeout = self._timeout
if isqueue(data) or isinstance(data, Iterator):
if isinstance(data, pyQueue) or isinstance(data, Iterator):
raise TypeError(
"Dask no longer supports mapping over Iterators or Queues."
"Consider using a normal for loop and Client.submit"
Expand Down
2 changes: 0 additions & 2 deletions distributed/comm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

from .addressing import (
parse_address,
unparse_address,
Expand Down
2 changes: 0 additions & 2 deletions distributed/comm/addressing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

import six

import dask
Expand Down
2 changes: 0 additions & 2 deletions distributed/comm/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

from abc import ABCMeta, abstractmethod, abstractproperty
from datetime import timedelta
import logging
Expand Down
5 changes: 1 addition & 4 deletions distributed/comm/inproc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

from collections import deque, namedtuple
import itertools
import logging
Expand All @@ -11,7 +9,6 @@
from tornado.concurrent import Future
from tornado.ioloop import IOLoop

from ..compatibility import finalize
from ..protocol import nested_deserialize
from ..utils import get_ip

Expand Down Expand Up @@ -161,7 +158,7 @@ def __init__(
self._write_loop = write_loop
self._closed = False

self._finalizer = finalize(self, self._get_finalizer())
self._finalizer = weakref.finalize(self, self._get_finalizer())
self._finalizer.atexit = False
self._initialized = True

Expand Down
2 changes: 0 additions & 2 deletions distributed/comm/registry.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function, division, absolute_import

from abc import ABCMeta, abstractmethod

from six import with_metaclass
Expand Down
Loading

0 comments on commit 87c3082

Please sign in to comment.