Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-process communication transports #887

Merged
merged 12 commits into from
Feb 27, 2017
4 changes: 2 additions & 2 deletions distributed/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
""".strip()


from distributed.comm.core import (parse_address, unparse_address,
parse_host_port, unparse_host_port)
from distributed.comm import (parse_address, unparse_address,
parse_host_port, unparse_host_port)
from ..utils import get_ip, ensure_ip


Expand Down
5 changes: 5 additions & 0 deletions distributed/comm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from __future__ import print_function, division, absolute_import

from .addressing import (parse_address, unparse_address,
normalize_address, parse_host_port,
unparse_host_port, get_address_host_port,
resolve_address)
from .core import connect, listen, Comm, CommClosedError


Expand All @@ -13,6 +17,7 @@ def is_zmq_enabled():


def _register_transports():
from . import inproc
from . import tcp

if is_zmq_enabled():
Expand Down
115 changes: 115 additions & 0 deletions distributed/comm/addressing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from __future__ import print_function, division, absolute_import

import six

from ..config import config
from ..utils import ensure_ip


DEFAULT_SCHEME = config.get('default-scheme', 'tcp')


def parse_address(addr):
"""
Split address into its scheme and scheme-dependent location string.
"""
if not isinstance(addr, six.string_types):
raise TypeError("expected str, got %r" % addr.__class__.__name__)
scheme, sep, loc = addr.rpartition('://')
if not sep:
scheme = DEFAULT_SCHEME
return scheme, loc


def unparse_address(scheme, loc):
"""
Undo parse_address().
"""
return '%s://%s' % (scheme, loc)


def normalize_address(addr):
"""
Canonicalize address, adding a default scheme if necessary.
"""
return unparse_address(*parse_address(addr))


def parse_host_port(address, default_port=None):
"""
Parse an endpoint address given in the form "host:port".
"""
if isinstance(address, tuple):
return address
if address.startswith('tcp:'):
address = address[4:]

def _fail():
raise ValueError("invalid address %r" % (address,))

def _default():
if default_port is None:
raise ValueError("missing port number in address %r" % (address,))
return default_port

if address.startswith('['):
host, sep, tail = address[1:].partition(']')
if not sep:
_fail()
if not tail:
port = _default()
else:
if not tail.startswith(':'):
_fail()
port = tail[1:]
else:
host, sep, port = address.partition(':')
if not sep:
port = _default()
elif ':' in host:
_fail()

return host, int(port)


def unparse_host_port(host, port=None):
"""
Undo parse_host_port().
"""
if ':' in host and not host.startswith('['):
host = '[%s]' % host
if port:
return '%s:%s' % (host, port)
else:
return host


def get_address_host_port(addr):
"""
Get a (host, port) tuple out of the given address.
"""
scheme, loc = parse_address(addr)
if scheme not in ('tcp', 'zmq'):
raise ValueError("don't know how to extract host and port "
"for address %r" % (addr,))
return parse_host_port(loc)


def resolve_address(addr):
"""
Apply scheme-specific address resolution to *addr*, ensuring
all symbolic references are replaced with concrete location
specifiers.

In practice, this means hostnames are resolved to IP addresses.
"""
# XXX circular import; reorganize APIs into a distributed.comms.addressing module?
#from ..utils import ensure_ip
scheme, loc = parse_address(addr)
if scheme not in ('tcp', 'zmq'):
return addr

host, port = parse_host_port(loc)
loc = unparse_host_port(ensure_ip(host), port)
addr = unparse_address(scheme, loc)
return addr
118 changes: 7 additions & 111 deletions distributed/comm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from datetime import timedelta
import logging

from six import string_types, with_metaclass
from six import with_metaclass

from tornado import gen
from tornado.ioloop import IOLoop

from ..config import config
from ..metrics import time
from .addressing import parse_address


logger = logging.getLogger(__name__)
Expand All @@ -31,9 +32,6 @@
}


DEFAULT_SCHEME = config.get('default-scheme', 'tcp')


class CommClosedError(IOError):
pass

Expand Down Expand Up @@ -112,9 +110,6 @@ def stop(self):
Stop listening. This does not shutdown already established
communications, but prevents accepting new ones.
"""
tcp_server, self.tcp_server = self.tcp_server, None
if tcp_server is not None:
tcp_server.stop()

@abstractproperty
def listen_address(self):
Expand All @@ -130,111 +125,12 @@ def contact_address(self):
address such as 'tcp://0.0.0.0:123'.
"""

def __enter__(self):
self.start()
return self

def parse_address(addr):
"""
Split address into its scheme and scheme-dependent location string.
"""
if not isinstance(addr, string_types):
raise TypeError("expected str, got %r" % addr.__class__.__name__)
scheme, sep, loc = addr.rpartition('://')
if not sep:
scheme = DEFAULT_SCHEME
return scheme, loc


def unparse_address(scheme, loc):
"""
Undo parse_address().
"""
return '%s://%s' % (scheme, loc)


def parse_host_port(address, default_port=None):
"""
Parse an endpoint address given in the form "host:port".
"""
if isinstance(address, tuple):
return address
if address.startswith('tcp:'):
address = address[4:]

def _fail():
raise ValueError("invalid address %r" % (address,))

def _default():
if default_port is None:
raise ValueError("missing port number in address %r" % (address,))
return default_port

if address.startswith('['):
host, sep, tail = address[1:].partition(']')
if not sep:
_fail()
if not tail:
port = _default()
else:
if not tail.startswith(':'):
_fail()
port = tail[1:]
else:
host, sep, port = address.partition(':')
if not sep:
port = _default()
elif ':' in host:
_fail()

return host, int(port)


def unparse_host_port(host, port=None):
"""
Undo parse_host_port().
"""
if ':' in host and not host.startswith('['):
host = '[%s]' % host
if port:
return '%s:%s' % (host, port)
else:
return host


def get_address_host_port(addr):
"""
Get a (host, port) tuple out of the given address.
"""
scheme, loc = parse_address(addr)
if scheme not in ('tcp', 'zmq'):
raise ValueError("don't know how to extract host and port "
"for address %r" % (addr,))
return parse_host_port(loc)


def normalize_address(addr):
"""
Canonicalize address, adding a default scheme if necessary.
"""
return unparse_address(*parse_address(addr))


def resolve_address(addr):
"""
Apply scheme-specific address resolution to *addr*, ensuring
all symbolic references are replaced with concrete location
specifiers.

In practice, this means hostnames are resolved to IP addresses.
"""
# XXX circular import; reorganize APIs into a distributed.comms.addressing module?
from ..utils import ensure_ip
scheme, loc = parse_address(addr)
if scheme not in ('tcp', 'zmq'):
raise ValueError("don't know how to extract host and port "
"for address %r" % (addr,))
host, port = parse_host_port(loc)
loc = unparse_host_port(ensure_ip(host), port)
addr = unparse_address(scheme, loc)
return addr
def __exit__(self, *exc):
self.stop()


@gen.coroutine
Expand Down
Loading