From 0420e45a6fb8380a27fbd11ec907ab9a137a5282 Mon Sep 17 00:00:00 2001 From: Richard Wall <richard.wall@clusterhq.com> Date: Tue, 4 Oct 2016 14:34:39 +0100 Subject: [PATCH 1/2] Removed most of the _docker module and related tests. --- flocker/node/_docker.py | 906 +--------------- flocker/node/functional/test_docker.py | 1328 ------------------------ flocker/node/test/test_docker.py | 500 --------- flocker/node/testtools.py | 164 +-- 4 files changed, 4 insertions(+), 2894 deletions(-) delete mode 100644 flocker/node/functional/test_docker.py delete mode 100644 flocker/node/test/test_docker.py diff --git a/flocker/node/_docker.py b/flocker/node/_docker.py index 98bb14bea3..e05b7eefbd 100644 --- a/flocker/node/_docker.py +++ b/flocker/node/_docker.py @@ -7,379 +7,22 @@ from __future__ import absolute_import -from datetime import timedelta - from errno import ECONNREFUSED from socket import error as socket_error from functools import partial -from itertools import repeat -from time import sleep - -from zope.interface import Interface, implementer from docker import Client -from docker.errors import APIError, NotFound - -from eliot import Message, MessageType, Field, start_action - -from repoze.lru import LRUCache +from docker.errors import APIError -from pyrsistent import field, PClass, pset - -from requests import Response from requests.exceptions import ConnectionError from requests.packages.urllib3.exceptions import ProtocolError -from characteristic import with_cmp - -from twisted.python.components import proxyForInterface -from twisted.python.filepath import FilePath -from twisted.internet.defer import succeed, fail -from twisted.internet.threads import deferToThread -from twisted.web.http import NOT_FOUND, INTERNAL_SERVER_ERROR +from twisted.web.http import INTERNAL_SERVER_ERROR from ..common import ( - poll_until, retry_if, decorate_methods, with_retry, get_default_retry_steps, ) -from ..control._model import ( - RestartNever, RestartAlways, RestartOnFailure, pset_field, pvector_field) - - -LOG_CACHED_IMAGE = MessageType( - u"flocker:node:docker:image_from_cache", - [Field.for_types(u"image", [unicode], "The image ID.")], - "An image was retrieved from the cache." -) - - -class AlreadyExists(Exception): - """A unit with the given name already exists.""" - - -@with_cmp(["address", "apierror"]) -class AddressInUse(Exception): - """ - The listen address for an exposed port was in use and could not be bound. - """ - def __init__(self, address, apierror): - """ - :param tuple address: The conventional Python representation of the - address which could not be bound (eg, an (ipv4 address, port - number) pair for IPv4 addresses). - :param APIError apierror: The original Docker API error indicating this - problem. Or ``None`` if the error was not derived from the result - of a Docker API call. - """ - Exception.__init__(self, address, apierror) - self.address = address - self.apierror = apierror - - -class Environment(PClass): - """ - A collection of environment variables. - - :ivar frozenset variables: A ``frozenset`` of tuples containing - key and value pairs representing the environment variables. - """ - variables = field(mandatory=True) - - def to_dict(self): - """ - Convert to a dictionary suitable for serialising to JSON and then on to - the Docker API. - - :return: ``dict`` mapping keys to values. - """ - return dict(self.variables) - - -class Volume(PClass): - """ - A Docker volume. - - :ivar FilePath node_path: The volume's path on the node's - filesystem. - - :ivar FilePath container_path: The volume's path within the - container. - """ - node_path = field(mandatory=True, type=FilePath) - container_path = field(mandatory=True, type=FilePath) - - -class PortMap(PClass): - """ - A record representing the mapping between a port exposed internally by a - docker container and the corresponding external port on the host. - - :ivar int internal_port: The port number exposed by the container. - :ivar int external_port: The port number exposed by the host. - """ - internal_port = field(mandatory=True, type=int) - external_port = field(mandatory=True, type=int) - - -class ImageDataCache(PClass): - """ - A record representing cached image data. The cache only stores - the data we care about from an inspected image. - - :ivar list command: The image command. - :ivar list environment: A list of unicode strings representing - the image's environment variables. - """ - command = field(mandatory=True, type=(list, type(None))) - environment = field(mandatory=True, type=(list, type(None))) - - -class Unit(PClass): - """ - Information about a unit managed by Docker. - - XXX "Unit" is geard terminology, and should be renamed. See - https://clusterhq.atlassian.net/browse/FLOC-819 - - :ivar unicode name: The name of the unit, which may not be the same as - the container name. - - :ivar unicode container_name: The name of the container where the - application is running. - - :ivar unicode activation_state: The state of the - container. ``u"active"`` indicates it is running, ``u"inactive"`` - indicates it is not running. See - https://clusterhq.atlassian.net/browse/FLOC-187 about using - constants instead of strings and other improvements. - - :ivar unicode container_image: The docker image name associated with this - container. - - :ivar PSet ports: The ``PortMap`` instances which define how - connections to ports on the host are routed to ports exposed in - the container. - - :ivar Environment environment: An ``Environment`` whose variables - will be supplied to the Docker container or ``None`` if there are no - environment variables for this container. - - :ivar PSet volumes: ``Volume`` instances, the container's volumes. - - :ivar int mem_limit: The number of bytes to which to limit the in-core - memory allocations of this unit. Or ``None`` to apply no limits. The - behavior when the limit is encountered depends on the container - execution driver but the likely behavior is for the container process - to be killed (and therefore the container to exit). Docker most likely - maps this value onto the cgroups ``memory.limit_in_bytes`` value. - - :ivar int cpu_shares: The number of CPU shares to allocate to this unit. - Or ``None`` to let it have the default number of shares. Docker maps - this value onto the cgroups ``cpu.shares`` value (the default of which - is probably 1024). - - :ivar IRestartPolicy restart_policy: The restart policy of the container. - - :ivar command_line: Custom command to run using the image, a ``PVector`` - of ``unicode``. ``None`` means use default. - - :ivar swappiness: Tunable swappiness of the container. - Default of 0 disables swap. - """ - name = field(mandatory=True) - container_name = field(mandatory=True) - activation_state = field(mandatory=True) - container_image = field(mandatory=True, initial=None) - ports = pset_field(PortMap) - environment = field(mandatory=True, initial=None) - volumes = pset_field(Volume) - mem_limit = field(mandatory=True, initial=None) - cpu_shares = field(mandatory=True, initial=None) - restart_policy = field(mandatory=True, initial=RestartNever()) - command_line = pvector_field(unicode, optional=True, initial=None) - swappiness = field(mandatory=False, initial=0, type=int) - - -class IDockerClient(Interface): - """ - A client for the Docker HTTP API. - - Note the difference in semantics between the results of ``add()`` - (firing does not indicate application started successfully) - vs. ``remove()`` (firing indicates application has finished shutting - down). - """ - - def add(unit_name, image_name, ports=None, environment=None, volumes=(), - mem_limit=None, cpu_shares=None, restart_policy=RestartNever(), - command_line=None, swappiness=0): - """ - Install and start a new unit. - - Note that callers should not assume success indicates the unit has - finished starting up. In addition to asynchronous nature of Docker, - even if container is up and running the application within it might - still be starting up, e.g. it may not have bound the external ports - yet. As a result the final success of application startup is out of - scope for this method. - - :param unicode unit_name: The name of the unit to create. - :param unicode image_name: The Docker image to use for the unit. - :param list ports: A list of ``PortMap``\ s mapping ports exposed in - the container to ports exposed on the host. Default ``None`` means - that no port mappings will be configured for this unit. If a - ``PortMap`` instance's ``external_port`` is set to ``0`` a free - port will automatically be assigned. The assigned port will be - reported for the container in the result of ``IDockerClient.list``. - :param Environment environment: Environment variables for the - container. Default ``None`` means that no environment variables - will be supplied to the unit. - :param volumes: A sequence of ``Volume`` instances to mount. - :param int mem_limit: The number of bytes to which to limit the in-core - memory allocations of the new unit. Or ``None`` to apply no - limits. - :param int cpu_shares: The number of CPU shares to allocate to the new - unit. Or ``None`` to let it have the default number of shares. - Docker maps this value onto the cgroups ``cpu.shares`` value (the - default of which is probably 1024). - :param IRestartPolicy restart_policy: The restart policy of the - container. - :param command_line: Custom command to run using the image, a sequence - of ``unicode``, or ``None`` to use default image command line. - :param swappiness: Tune container's memory swappiness. - Default of 0 disables swap. - - :return: ``Deferred`` that fires on success, or errbacks with - :class:`AlreadyExists` if a unit by that name already exists. - """ - - def exists(unit_name): - """ - Check whether the unit exists. - - :param unicode unit_name: The name of the unit whose existence - we're checking. - - :return: ``Deferred`` that fires with ``True`` if unit exists, - otherwise ``False``. - """ - - def remove(unit_name): - """ - Stop and delete the given unit. - - This can be done multiple times in a row for the same unit. - - :param unicode unit_name: The name of the unit to stop. - - :return: ``Deferred`` that fires once the unit has been stopped - and removed. - """ - - def list(): - """ - List all known units. - - :return: ``Deferred`` firing with ``set`` of :class:`Unit`. - """ - - -def make_response(code, message): - """ - Create a ``requests.Response`` with the given response code and message. - - :param int code: The HTTP response code to include in the fake response. - :param unicode message: The HTTP response message to include in the fake - response. The message will be encoded using ASCII. - """ - response = Response() - response.status_code = code - response.reason = message - return response - - -@implementer(IDockerClient) -class FakeDockerClient(object): - """ - In-memory fake that simulates talking to a docker daemon. - - The state the the simulated units is stored in memory. - - :ivar dict _units: See ``units`` of ``__init__``\ . - :ivar pset _used_ports: A set of integers giving the port numbers which - will be considered in use. Attempts to add containers which use these - ports will fail. - """ - - def __init__(self, units=None): - """ - :param dict units: A dictionary of canned ``Unit``\ s which will be - manipulated and returned by the methods of this - ``FakeDockerClient``. - :type units: ``dict`` mapping `unit_name` to ``Unit``\ . - """ - if units is None: - units = {} - self._units = units - self._used_ports = pset() - - def add(self, unit_name, image_name, ports=frozenset(), environment=None, - volumes=frozenset(), mem_limit=None, cpu_shares=None, - restart_policy=RestartNever(), command_line=None, swappiness=0): - if unit_name in self._units: - return fail(AlreadyExists(unit_name)) - for port in ports: - if port.external_port in self._used_ports: - raise AddressInUse( - address=(b"0.0.0.0", port.external_port), - apierror=APIError( - 'fake api response from server', - response=make_response(500, 'fake response')), - ) - - all_ports = set(range(2 ** 15, 2 ** 16)) - assigned_ports = [] - for port in ports: - if port.external_port == 0: - available_ports = pset(all_ports) - self._used_ports - assigned = next(iter(available_ports)) - port = port.set(external_port=assigned) - assigned_ports.append(port) - self._used_ports = self._used_ports.add(port.external_port) - - self._units[unit_name] = Unit( - name=unit_name, - container_name=unit_name, - container_image=image_name, - ports=frozenset(assigned_ports), - environment=environment, - volumes=frozenset(volumes), - activation_state=u'active', - mem_limit=mem_limit, - cpu_shares=cpu_shares, - restart_policy=restart_policy, - command_line=command_line, - swappiness=swappiness - ) - return succeed(None) - - def exists(self, unit_name): - return succeed(unit_name in self._units) - - def remove(self, unit_name): - if unit_name in self._units: - del self._units[unit_name] - return succeed(None) - - def list(self): - units = set(self._units.values()) - return succeed(units) - - -# Basic namespace for Flocker containers: -BASE_NAMESPACE = u"flocker--" - class TimeoutClient(Client): """ @@ -472,548 +115,3 @@ def dockerpy_client(**kwargs): steps=get_default_retry_steps(), ), ) - - -@implementer(IDockerClient) -class DockerClient(object): - """ - Talk to the real Docker server directly. - - Some operations can take a while (e.g. stopping a container), so we - use a thread pool. See https://clusterhq.atlassian.net/browse/FLOC-718 - for using a custom thread pool. - - :ivar unicode namespace: A namespace prefix to add to container names - so we don't clobber other applications interacting with Docker. - :ivar str base_url: URL for connection to the Docker server. - :ivar int long_timeout: Maximum time in seconds to wait for - long-running operations, particularly pulling an image. - :ivar LRUCache _image_cache: Mapped cache of image IDs to their data. - """ - def __init__( - self, namespace=BASE_NAMESPACE, base_url=None, - long_timeout=600): - self.namespace = namespace - self._client = dockerpy_client( - version="1.15", base_url=base_url, - long_timeout=timedelta(seconds=long_timeout), - ) - self._image_cache = LRUCache(100) - - def _to_container_name(self, unit_name): - """ - Add the namespace to the container name. - - :param unicode unit_name: The unit's name. - - :return unicode: The container's name. - """ - return self.namespace + unit_name - - def _parse_container_ports(self, data): - """ - Parse the ports from a data structure representing the Ports - configuration of a Docker container in the format returned by - ``self._client.inspect_container`` and return a list containing - ``PortMap`` instances mapped to the container and host exposed ports. - - :param dict data: The data structure for the representation of - container and host port mappings in a single container. - This takes the form of the ``NetworkSettings.Ports`` portion - of a container's state and configuration as returned by inspecting - the container. This is a dictionary mapping container ports to a - list of host bindings, e.g. - "3306/tcp": [{"HostIp": "0.0.0.0","HostPort": "53306"}, - {"HostIp": "0.0.0.0","HostPort": "53307"}] - - :return list: A list that is either empty or contains ``PortMap`` - instances. - """ - ports = [] - for internal, hostmap in data.items(): - internal_map = internal.split(u'/') - internal_port = internal_map[0] - internal_port = int(internal_port) - if hostmap: - for host in hostmap: - external_port = host[u"HostPort"] - external_port = int(external_port) - portmap = PortMap(internal_port=internal_port, - external_port=external_port) - ports.append(portmap) - return ports - - def _parse_restart_policy(self, data): - """ - Parse the restart policy from the configuration of a Docker container - in the format returned by ``self._client.inspect_container`` and return - an ``IRestartPolicy``. - - :param dict data: The data structure representing the restart policy of - a container, e.g. - - {"Name": "policy-name", "MaximumRetryCount": 0} - - :return IRestartPolicy: The model of the restart policy. - - :raises ValueError: if an unknown policy is passed. - """ - POLICIES = { - u"": lambda data: - RestartNever(), - u"always": lambda data: - RestartAlways(), - u"on-failure": lambda data: - RestartOnFailure( - maximum_retry_count=data[u"MaximumRetryCount"] or None) - } - try: - # docker will treat an unknown plolicy as "never". - # We error out here, in case new policies are added. - return POLICIES[data[u"Name"]](data) - except KeyError: - raise ValueError("Unknown restart policy: %r" % (data[u"Name"],)) - - def _serialize_restart_policy(self, restart_policy): - """ - Serialize the restart policy from an ``IRestartPolicy`` to the format - expected by the docker API. - - :param IRestartPolicy restart_policy: The model of the restart policy. - - :returns: A dictionary suitable to pass to docker - - :raises ValueError: if an unknown policy is passed. - """ - SERIALIZERS = { - RestartNever: lambda policy: - {u"Name": u""}, - RestartAlways: lambda policy: - {u"Name": u"always"}, - RestartOnFailure: lambda policy: - {u"Name": u"on-failure", - u"MaximumRetryCount": policy.maximum_retry_count or 0}, - } - try: - return SERIALIZERS[restart_policy.__class__](restart_policy) - except KeyError: - raise ValueError("Unknown restart policy: %r" % (restart_policy,)) - - def _image_not_found(self, apierror): - """ - Inspect a ``docker.errors.APIError`` to determine if it represents a - failure to start a container because the container's image wasn't - found. - - :return: ``True`` if this is the case, ``False`` if the error has - another cause. - :rtype: ``bool`` - """ - return apierror.response.status_code == NOT_FOUND - - def _address_in_use(self, apierror): - """ - Inspect a ``docker.errors.APIError`` to determine if it represents a - failure to start a container because the container is configured to use - ports that are already in use on the system. - - :return: If this is the reason, an exception to raise describing the - problem. Otherwise, ``None``. - """ - # Recognize an error (without newline) like: - # - # Cannot start container <name>: Error starting userland proxy: - # listen tcp <ip>:<port>: bind: address already in use - # - # Or (without newline) like: - # - # Cannot start container <name>: Bind for <ip>:<port> failed: - # port is already allocated - # - # because Docker can't make up its mind about which format to use. - parts = apierror.explanation.split(b": ") - if parts[-1] == b"address already in use": - ip, port = parts[-3].split()[-1].split(b":") - elif parts[-1] == b"port is already allocated": - ip, port = parts[-2].split()[2].split(b":") - else: - return None - return AddressInUse(address=(ip, int(port)), apierror=apierror) - - def _image_data(self, image): - """ - Supply data about an image, by either inspecting it or returning - cached data if available. - - :param unicode image: The ID of the image. - - :return: ``dict`` representing data about the image properties. - """ - cached_image = self._image_cache.get(image) - if cached_image is not None: - LOG_CACHED_IMAGE(image=image).write() - return cached_image - try: - image_data = self._client.inspect_image(image) - Message.new( - message_type="flocker:node:docker:image_inspected", - image=image - ).write() - except APIError as e: - if e.response.status_code == NOT_FOUND: - # Image has been deleted, so just fill in some - # stub data so we can return *something*. This - # should happen only for stopped containers so - # some inaccuracy is acceptable. - # We won't cache stub data though. - Message.new( - message_type="flocker:node:docker:image_not_found", - image=image - ).write() - image_data = {u"Config": {u"Env": [], u"Cmd": []}} - else: - raise - cached_data = ImageDataCache( - command=image_data[u"Config"][u"Cmd"], - environment=image_data[u"Config"][u"Env"] - ) - self._image_cache.put(image, cached_data) - Message.new( - message_type="flocker:node:docker:image_data_cached", - image=image - ).write() - return cached_data - - def add(self, unit_name, image_name, ports=None, environment=None, - volumes=(), mem_limit=None, cpu_shares=None, - restart_policy=RestartNever(), command_line=None, - swappiness=0): - container_name = self._to_container_name(unit_name) - - if environment is not None: - environment = environment.to_dict() - if ports is None: - ports = [] - - restart_policy_dict = self._serialize_restart_policy(restart_policy) - - def _create(): - binds = list( - # The "Z" mode tells Docker to "relabel file objects" on the - # volume. This makes things work when SELinux is enabled, at - # least in the default configuration on CentOS 7. See - # <https://docs.docker.com/reference/commandline/run/>, in the - # `--volumes-from` section (or just search for SELinux). - u"{}:{}:Z".format( - volume.node_path.path, volume.container_path.path - ) - for volume in volumes - ) - port_bindings = { - p.internal_port: p.external_port - for p in ports - } - host_config = self._client.create_host_config( - binds=binds, - port_bindings=port_bindings, - restart_policy=restart_policy_dict, - ) - # We're likely to get e.g. pvector, so make sure we're passing - # in something JSON serializable: - command_line_values = command_line - if command_line_values is not None: - command_line_values = list(command_line_values) - - memswap_limit = -1 - if swappiness != 0: - memswap_limit = mem_limit + mem_limit * swappiness - - self._client.create_container( - name=container_name, - image=image_name, - command=command_line_values, - environment=environment, - ports=[p.internal_port for p in ports], - mem_limit=mem_limit, - cpu_shares=cpu_shares, - host_config=host_config, - memswap_limit=memswap_limit, - ) - - def _add(): - try: - _create() - except APIError as e: - if self._image_not_found(e): - # Pull it and try again - self._client.pull(image_name) - _create() - else: - # Unrecognized, just raise it. - raise - - # Just because we got a response doesn't mean Docker has - # actually updated any internal state yet! So if e.g. we did a - # start on this container Docker might well complain it knows - # not the container of which we speak. To prevent this we poll - # until it does exist. - while True: - try: - self._client.start(container_name) - except NotFound: - sleep(0.01) - else: - break - - d = deferToThread(_add) - - def _extract_error(failure): - failure.trap(APIError) - code = failure.value.response.status_code - if code == 409: - raise AlreadyExists(unit_name) - - in_use = self._address_in_use(failure.value) - if in_use is not None: - # We likely can't start the container because its - # configuration conflicts with something else happening on - # the system. Reflect this failure condition in a more - # easily recognized way. - raise in_use - - return failure - d.addErrback(_extract_error) - return d - - def _blocking_exists(self, container_name): - """ - Blocking API to check if container exists. - - :param unicode container_name: The name of the container whose - existence we're checking. - - :return: ``True`` if unit exists, otherwise ``False``. - """ - try: - self._client.inspect_container(container_name) - return True - except APIError: - return False - - def exists(self, unit_name): - container_name = self._to_container_name(unit_name) - return deferToThread(self._blocking_exists, container_name) - - def _stop_container(self, container_name): - """Attempt to stop the given container. - - There is a race condition between a process dying and - Docker noticing that fact: - - https://github.com/docker/docker/issues/5165#issuecomment-65753753 - - If we get an error indicating that this race condition happened, - return False. This means the caller should try again. If we *do* - successfully stop the container, return True. - - :raise APIError: If the container failed to stop for some unknown - reason. - :return: True if we stopped the container, False otherwise. - - """ - try: - with start_action( - action_type='flocker:docker:container_stop', - container=container_name - ): - self._client.stop(container_name) - except APIError as e: - if e.response.status_code == NOT_FOUND: - # If the container doesn't exist, we swallow the error, - # since this method is supposed to be idempotent. - return True - elif e.response.status_code == INTERNAL_SERVER_ERROR: - # Docker returns this if the process had died, but - # hasn't noticed it yet. - return False - else: - raise - return True - - def _remove_container(self, container_name): - """ - Attempt to remove a container. - - Assumes the given container has already been stopped. - - :param unicode container_name: The fully-namespaced name of the - container. - :return: True if we removed the container, False otherwise. - """ - try: - # The ``docker.Client.stop`` method sometimes returns a - # 404 error, even though the container exists. - # See https://github.com/docker/docker/issues/13088 - # Wait until the container has actually stopped running - # before attempting to remove it. Otherwise we are - # likely to see: 'docker.errors.APIError: 409 Client - # Error: Conflict ("Conflict, You cannot remove a - # running container. Stop the container before - # attempting removal or use -f")' - # This code should probably be removed once the above - # issue has been resolved. See [FLOC-1850] - self._client.wait(container_name) - - with start_action( - action_type='flocker:docker:container_remove', - container=container_name - ): - self._client.remove_container(container_name) - except APIError as e: - if e.response.status_code == NOT_FOUND: - # If the container doesn't exist, we swallow the error, - # since this method is supposed to be idempotent. - return True - elif e.response.status_code == INTERNAL_SERVER_ERROR: - # Failure to remove container - see FLOC-3262 for an example. - return False - else: - raise - return True - - def remove(self, unit_name): - container_name = self._to_container_name(unit_name) - - def _remove(): - # Previously, this looped forever and didn't pause between loops. - # We've arbitrarily chosen a wait interval of 0.001 seconds and - # 1000 retries (i.e. a second of polling). These values may need - # tuning. - poll_until( - partial(self._stop_container, container_name), - repeat(0.001, 1000)) - - # Previously, the container remove was only tried once. Again, - # these parameters may need tuning. - poll_until( - partial(self._remove_container, container_name), - repeat(0.001, 1000)) - - d = deferToThread(_remove) - return d - - def list(self): - def _list(): - result = set() - ids = [d[u"Id"] for d in - self._client.containers(quiet=True, all=True)] - for i in ids: - - try: - data = self._client.inspect_container(i) - except APIError as e: - # The container ID returned by the list API call above, may - # have been removed in another thread. - if e.response.status_code == NOT_FOUND: - continue - else: - raise - - state = (u"active" if data[u"State"][u"Running"] - else u"inactive") - name = data[u"Name"] - # Since tags (e.g. "busybox") aren't stable, ensure we're - # looking at the actual image by using the hash: - image = data[u"Image"] - image_tag = data[u"Config"][u"Image"] - command = data[u"Config"][u"Cmd"] - with start_action( - action_type=u"flocker:node:docker:inspect_image", - container=i, - running=data[u"State"][u"Running"] - ): - image_data = self._image_data(image) - if image_data.command == command: - command = None - port_bindings = data[u"NetworkSettings"][u"Ports"] - if port_bindings is not None: - ports = self._parse_container_ports(port_bindings) - else: - ports = list() - volumes = [] - binds = data[u"HostConfig"]['Binds'] - if binds is not None: - for bind_config in binds: - parts = bind_config.split(':', 2) - node_path, container_path = parts[:2] - volumes.append( - Volume(container_path=FilePath(container_path), - node_path=FilePath(node_path)) - ) - if name.startswith(u"/" + self.namespace): - name = name[1 + len(self.namespace):] - else: - continue - # Retrieve environment variables for this container, - # disregarding any environment variables that are part - # of the image, rather than supplied in the configuration. - unit_environment = [] - container_environment = data[u"Config"][u"Env"] - if image_data.environment is None: - image_environment = [] - else: - image_environment = image_data.environment - if container_environment is not None: - for environment in container_environment: - if environment not in image_environment: - env_key, env_value = environment.split('=', 1) - unit_environment.append((env_key, env_value)) - unit_environment = ( - Environment(variables=frozenset(unit_environment)) - if unit_environment else None - ) - # Our Unit model counts None as the value for cpu_shares and - # mem_limit in containers without specified limits, however - # Docker returns the values in these cases as zero, so we - # manually convert. - cpu_shares = data[u"Config"][u"CpuShares"] - cpu_shares = None if cpu_shares == 0 else cpu_shares - mem_limit = data[u"Config"][u"Memory"] - mem_limit = None if mem_limit == 0 else mem_limit - restart_policy = self._parse_restart_policy( - data[U"HostConfig"][u"RestartPolicy"]) - result.add(Unit( - name=name, - container_name=self._to_container_name(name), - activation_state=state, - container_image=image_tag, - ports=frozenset(ports), - volumes=frozenset(volumes), - environment=unit_environment, - mem_limit=mem_limit, - cpu_shares=cpu_shares, - restart_policy=restart_policy, - command_line=command) - ) - return result - return deferToThread(_list) - - -class NamespacedDockerClient(proxyForInterface(IDockerClient, "_client")): - """ - A Docker client that only shows and creates containers in a given - namespace. - - Unlike ``DockerClient``, whose namespace is there to prevent conflicts - with other Docker users, this class deals with Flocker's internal - concept of namespaces. I.e. if hypothetically Docker container names - supported path-based namespaces then ``DockerClient`` would look at - containers in ``/flocker/`` and this class would look at containers in - in ``/flocker/<namespace>/``. - """ - def __init__(self, namespace, base_url=None): - """ - :param unicode namespace: Namespace to restrict containers to. - """ - self._client = DockerClient( - namespace=BASE_NAMESPACE + namespace + u"--") diff --git a/flocker/node/functional/test_docker.py b/flocker/node/functional/test_docker.py deleted file mode 100644 index 96398a90ed..0000000000 --- a/flocker/node/functional/test_docker.py +++ /dev/null @@ -1,1328 +0,0 @@ -# Copyright ClusterHQ Inc. See LICENSE file for details. - -""" -Functional tests for :module:`flocker.node._docker`. -""" - -from __future__ import absolute_import - -from datetime import timedelta -from functools import partial -import time -import socket - -from eliot.testing import capture_logging, assertHasMessage - -from requests.exceptions import ReadTimeout -from docker.errors import APIError - -from twisted.python.monkey import MonkeyPatcher -from twisted.python.filepath import FilePath -from twisted.internet import reactor -from twisted.internet.defer import succeed, gatherResults -from twisted.internet.error import ConnectionRefusedError -from twisted.web.client import ResponseNeverReceived - -from treq import request, content - -from pyrsistent import PClass, pvector, field - -from ...common import loop_until -from ...testtools import ( - find_free_port, flaky, DockerImageBuilder, assertContainsAll, - random_name, - async_runner, TestCase, AsyncTestCase, -) - -from ..test.test_docker import ANY_IMAGE, make_idockerclient_tests -from .._docker import ( - DockerClient, PortMap, Environment, NamespacedDockerClient, - BASE_NAMESPACE, Volume, AddressInUse, make_response, - LOG_CACHED_IMAGE, dockerpy_client, -) -from ...control import ( - RestartNever, RestartAlways, RestartOnFailure, DockerImage -) -from ..testtools import ( - if_docker_configured, wait_for_unit_state, require_docker_version, - add_with_port_collision_retry, -) - - -def namespace_for_test(test_case): - return u"ns-" + random_name(test_case) - - -class IDockerClientTests(make_idockerclient_tests( - lambda test_case: DockerClient( - namespace=namespace_for_test(test_case) - ), -)): - """ - ``IDockerClient`` tests for ``DockerClient``. - """ - @if_docker_configured - def setUp(self): - super(IDockerClientTests, self).setUp() - - -class IDockerClientNamespacedTests(make_idockerclient_tests( - lambda test_case: NamespacedDockerClient( - namespace=namespace_for_test(test_case) - ) -)): - """ - ``IDockerClient`` tests for ``NamespacedDockerClient``. - """ - @if_docker_configured - def setUp(self): - super(IDockerClientNamespacedTests, self).setUp() - - @flaky([u'FLOC-2628', u'FLOC-2874']) - def test_added_is_listed(self): - return super(IDockerClientNamespacedTests, self).test_added_is_listed() - - -class Registry(PClass): - """ - Describe a Docker image registry. - - :ivar host: The IP address on which the registry is listening. - :ivar port: The port number on which the registry is listening. - :ivar name: The name of the container in which the registry is running. - """ - host = field(mandatory=True, type=bytes, initial=b"127.0.0.1") - port = field(mandatory=True, type=int) - name = field(mandatory=True, type=unicode) - - @property - def repository(self): - """ - The string to use as an image name prefix to direct Docker to find that - image in this registry instead of the default. - """ - return "{host}:{port}".format(host=self.host, port=self.port) - - -class GenericDockerClientTests(AsyncTestCase): - """ - Functional tests for ``DockerClient`` and other clients that talk to - real Docker. - """ - clientException = APIError - - # FLOC-3935: These tests (and the ones in NamespacedDockerClientTests) are - # often timing out, sometimes in weird ways that cause interference with - # other tests. Until we can identify the cause, effectively disable - # timeouts on these tests and rely on the Jenkins timeout (or the limited - # patience of developers) to ensure they halt. - run_tests_with = async_runner(timeout=timedelta(hours=1)) - - @if_docker_configured - def setUp(self): - super(GenericDockerClientTests, self).setUp() - self.namespacing_prefix = namespace_for_test(self) - - def make_client(self): - return DockerClient(namespace=self.namespacing_prefix) - - def create_container(self, client, name, image): - """ - Create (but don't start) a container via the supplied client. - - :param DockerClient client: The Docker API client. - :param unicode name: The container name. - :param unicode image: The image name. - """ - container_name = client._to_container_name(name) - client._client.create_container( - name=container_name, image=image) - - def start_container(self, unit_name, - image_name=u"openshift/busybox-http-app:latest", - ports=None, expected_states=(u'active',), - environment=None, volumes=(), - mem_limit=None, cpu_shares=None, - restart_policy=RestartNever(), - command_line=None, - retry_on_port_collision=False): - """ - Start a unit and wait until it reaches the `active` state or the - supplied `expected_state`. - - :param unicode unit_name: See ``IDockerClient.add``. - :param unicode image_name: See ``IDockerClient.add``. - :param list ports: See ``IDockerClient.add``. - :param expected_states: A sequence of activation states to wait for. - :param environment: See ``IDockerClient.add``. - :param volumes: See ``IDockerClient.add``. - :param mem_limit: See ``IDockerClient.add``. - :param cpu_shares: See ``IDockerClient.add``. - :param restart_policy: See ``IDockerClient.add``. - :param command_line: See ``IDockerClient.add``. - - :return: ``Deferred`` that fires with the ``DockerClient`` when - the unit reaches the expected state. - """ - client = self.make_client() - - if retry_on_port_collision: - add = partial(add_with_port_collision_retry, client) - else: - add = client.add - - d = add( - unit_name=unit_name, - image_name=image_name, - ports=ports, - environment=environment, - volumes=volumes, - mem_limit=mem_limit, - cpu_shares=cpu_shares, - restart_policy=restart_policy, - command_line=command_line, - ) - self.addCleanup(client.remove, unit_name) - - d.addCallback(lambda _: wait_for_unit_state(reactor, client, unit_name, - expected_states)) - d.addCallback(lambda _: client) - - return d - - def test_custom_base_url_tcp_http(self): - """ - ``DockerClient`` instantiated with a custom base URL for a TCP - connection has a client HTTP url after the connection is made. - """ - client = DockerClient(base_url=b"tcp://127.0.0.1:2375") - self.assertEqual(client._client.base_url, b"http://127.0.0.1:2375") - - def test_add_starts_container(self): - """ - ``DockerClient.add`` starts the container. - """ - name = random_name(self) - return self.start_container(name) - - def test_correct_image_used(self): - """ - ``DockerClient.add`` creates a container with the specified image. - """ - image_name = u"openshift/busybox-http-app:latest" - name = random_name(self) - d = self.start_container(name, image_name=image_name) - - def started(_): - docker = dockerpy_client() - data = docker.inspect_container(self.namespacing_prefix + name) - self.assertEqual( - image_name, - data[u"Config"][u"Image"], - ) - d.addCallback(started) - return d - - @capture_logging(assertHasMessage, LOG_CACHED_IMAGE) - def test_list_image_data_cached(self, logger): - """ - ``DockerClient.list`` will only an inspect an image ID once, caching - the resulting data. - """ - name = random_name(self) - d = self.start_container(name, image_name=ANY_IMAGE) - - def started(client): - listing = client.list() - - def listed(_): - class FakeAPIError(Exception): - pass - - def fake_inspect_image(image): - raise FakeAPIError( - "Tried to inspect image {} twice.".format(image)) - # This is kind of nasty, but NamespacedDockerClient represents - # its client via a proxying attribute. - if isinstance(client, NamespacedDockerClient): - docker_client = client._client._client - else: - docker_client = client._client - self.patch(docker_client, "inspect_image", fake_inspect_image) - # If image is not retrieved from the cache, list() here will - # attempt to call inspect_image again, resulting in a call to - # the fake_inspect_image function that will raise an exception. - cached_listing = client.list() - cached_listing.addCallback(lambda _: None) - return cached_listing - - listing.addCallback(listed) - return listing - - d.addCallback(started) - return d - - @require_docker_version( - '1.6.0', - 'This test uses the registry:2 image ' - 'which requires Docker-1.6.0 or newer. ' - 'See https://docs.docker.com/registry/deploying/ for details.' - ) - def test_private_registry_image(self): - """ - ``DockerClient.add`` can start containers based on an image from a - private registry. - - A private registry is started in a container according to the - instructions at: - * https://docs.docker.com/registry/deploying/ - - An image is pushed to that private registry and then a Flocker - application is started that uses that private repository image name. - - Docker can pull from a private registry without any TLS configuration - as long as it's running on the local host. - """ - registry_listening = self.run_registry() - - def tag_and_push_image(registry): - client = dockerpy_client() - image_name = ANY_IMAGE - # The image will normally have been pre-pulled on build slaves, but - # may not already be available when running tests locally. - client.pull(image_name) - - registry_image = self.push_to_registry(image_name, registry) - - # And the image will (hopefully) have been downloaded again from - # the private registry in the next step, so cleanup that local - # image once the test finishes. - self.addCleanup( - client.remove_image, - image=registry_image.full_name - ) - - return registry_image - - pushing_image = registry_listening.addCallback(tag_and_push_image) - - def start_registry_image(registry_image): - return self.start_container( - unit_name=random_name(self), - image_name=registry_image.full_name, - ) - starting_registry_image = pushing_image.addCallback( - start_registry_image - ) - return starting_registry_image - - def test_add_error(self): - """ - ``DockerClient.add`` returns a ``Deferred`` that errbacks with - ``APIError`` if response code is not a success response code. - """ - client = self.make_client() - # add() calls exists(), and we don't want exists() to be the one - # failing since that's not the code path we're testing, so bypass - # it: - client.exists = lambda _: succeed(False) - # Illegal container name should make Docker complain when we try to - # install the container: - d = client.add(u"!!!###!!!", u"busybox:latest") - return self.assertFailure(d, self.clientException) - - def test_dead_is_listed(self): - """ - ``DockerClient.list()`` includes dead units. - - We use a `busybox` image here, because it will exit immediately and - reach an `inactive` substate of `dead`. - - There are no assertions in this test, because it will fail with a - timeout if the unit with that expected state is never listed or if that - unit never reaches that state. - """ - name = random_name(self) - d = self.start_container(unit_name=name, image_name="busybox:latest", - expected_states=(u'inactive',)) - return d - - def test_list_with_missing_image(self): - """ - ``DockerClient.list()`` can list containers whose image is missing. - - The resulting output may be inaccurate, but that's OK: this only - happens for non-running containers, who at worst we're going to - restart anyway. - """ - path = FilePath(self.mktemp()) - path.makedirs() - path.child(b"Dockerfile.in").setContent( - b"FROM busybox\nCMD /bin/true\n") - builder = DockerImageBuilder(test=self, source_dir=path, cleanup=False) - d = builder.build() - - def image_built(image_name): - name = random_name(self) - d = self.start_container( - unit_name=name, image_name=image_name, - expected_states=(u'inactive',)) - return d.addCallback(lambda ignored: (name, image_name)) - d.addCallback(image_built) - - def stopped_container_exists((name, image_name)): - # Remove the image: - docker_client = dockerpy_client() - docker_client.remove_image(image_name, force=True) - - # Should be able to still list the container: - client = self.make_client() - listed = client.list() - listed.addCallback(lambda results: self.assertIn( - (name, "inactive"), - [(unit.name, unit.activation_state) for unit in results])) - return listed - d.addCallback(stopped_container_exists) - - return d - - def test_dead_is_removed(self): - """ - ``DockerClient.remove()`` removes dead units without error. - - We use a `busybox` image here, because it will exit immediately and - reach an `inactive` substate of `dead`. - """ - name = random_name(self) - d = self.start_container(unit_name=name, image_name="busybox:latest", - expected_states=(u'inactive',)) - - def remove_container(client): - client.remove(name) - d.addCallback(remove_container) - return d - - def request_until_response(self, port): - """ - Resend a test HTTP request until a response is received. - - The container may have started, but the webserver inside may take a - little while to start serving requests. - - :param int port: The localhost port to which an HTTP request will be - sent. - - :return: A ``Deferred`` which fires with the result of the first - successful HTTP request. - """ - def send_request(): - """ - Send an HTTP request in a loop until the request is answered. - """ - response = request( - b"GET", b"http://127.0.0.1:%d" % (port,), - persistent=False) - - def check_error(failure): - """ - Catch ConnectionRefused errors and response timeouts and return - False so that loop_until repeats the request. - - Other error conditions will be passed down the errback chain. - """ - failure.trap(ConnectionRefusedError, ResponseNeverReceived) - return False - response.addErrback(check_error) - return response - - return loop_until(reactor, send_request) - - def test_non_docker_port_collision(self): - """ - ``DockerClient.add`` returns a ``Deferred`` that fails with - ``AddressInUse`` if the external port of one of the ``PortMap`` - instances passed for ``ports`` is already in use on the system by - something other than a Docker container. - """ - address_user = socket.socket() - self.addCleanup(address_user.close) - - address_user.bind(('', 0)) - used_address = address_user.getsockname() - - name = random_name(self) - d = self.start_container( - name, ports=[ - PortMap(internal_port=10000, external_port=used_address[1]), - ], - ) - return self.assertFailure(d, AddressInUse) - - def test_add_with_port(self): - """ - ``DockerClient.add`` accepts a ports argument which is passed to - Docker to expose those ports on the unit. - - Assert that the busybox-http-app returns the expected "Hello world!" - response. - - XXX: We should use a stable internal container instead. See - https://clusterhq.atlassian.net/browse/FLOC-120 - - XXX: The busybox-http-app returns headers in the body of its response, - hence this over complicated custom assertion. See - https://github.com/openshift/geard/issues/213 - """ - expected_response = b'Hello world!\n' - external_port = find_free_port()[1] - name = random_name(self) - d = self.start_container( - name, ports=[PortMap(internal_port=8080, - external_port=external_port)], - retry_on_port_collision=True, - ) - - d.addCallback( - lambda ignored: self.request_until_response(external_port)) - - def started(response): - d = content(response) - d.addCallback(lambda body: self.assertIn(expected_response, body)) - return d - d.addCallback(started) - return d - - def test_add_with_environment(self): - """ - ``DockerClient.add`` accepts an environment object whose ID and - variables are used when starting a docker image. - """ - docker_dir = FilePath(self.mktemp()) - docker_dir.makedirs() - docker_dir.child(b"Dockerfile").setContent( - b'FROM busybox\n' - b'CMD ["/bin/sh", "-c", ' - b'"while true; do env && echo WOOT && sleep 1; done"]' - ) - expected_variables = frozenset({ - 'key1': 'value1', - 'key2': 'value2', - }.items()) - unit_name = random_name(self) - - image = DockerImageBuilder(test=self, source_dir=docker_dir) - d = image.build() - - def image_built(image_name): - return self.start_container( - unit_name=unit_name, - image_name=image_name, - environment=Environment(variables=expected_variables), - ) - d.addCallback(image_built) - - def started(_): - output = "" - client = dockerpy_client() - while True: - output += client.logs(self.namespacing_prefix + unit_name) - if "WOOT" in output: - break - assertContainsAll( - output, test_case=self, - needles=['{}={}\n'.format(k, v) - for k, v in expected_variables], - ) - d.addCallback(started) - return d - - @flaky(u"FLOC-3875") - def test_pull_image_if_necessary(self): - """ - The Docker image is pulled if it is unavailable locally. - """ - client = dockerpy_client() - - path = FilePath(self.mktemp()) - path.makedirs() - path.child(b"Dockerfile.in").setContent( - b"FROM busybox\n" - b"CMD /bin/true\n" - ) - builder = DockerImageBuilder( - test=self, source_dir=path, - # We're going to manipulate the various tags on the image ourselves - # in this test. We'll do (the slightly more complicated) cleanup - # so the builder shouldn't (and will encounter errors if we let - # it). - cleanup=False, - ) - building = builder.build() - registry_listening = self.run_registry() - - def create_container((image_name, registry)): - registry_image = self.push_to_registry(image_name, registry) - - # And the image will (hopefully) have been downloaded again from - # the private registry in the next step, so cleanup that local - # image once the test finishes. - self.addCleanup( - client.remove_image, - image=registry_image.full_name - ) - - name = random_name(self) - docker_client = self.make_client() - self.addCleanup(docker_client.remove, name) - d = docker_client.add(name, registry_image.full_name) - d.addCallback( - lambda _: self.assertTrue( - client.inspect_image(registry_image.full_name) - ) - ) - return d - - d = gatherResults((building, registry_listening)) - d.addCallback(create_container) - return d - - def push_to_registry(self, image_name, registry): - """ - Push an image identified by a local tag to the given registry. - - :param unicode image_name: The local tag which identifies the image to - push. - :param Registry registry: The registry to which to push the image. - - :return: A ``DockerImage`` describing the image in the registry. Note - in particular the tag of the image in the registry will differ from - the local tag of the image. - """ - registry_name = random_name(self).lower() - registry_image = DockerImage( - # XXX: See FLOC-246 for followup improvements to - # ``flocker.control.DockerImage`` to allow parsing of alternative - # registry hostnames and ports. - repository=registry.repository + '/' + registry_name, - tag='latest', - ) - client = dockerpy_client() - - # Tag an image with a repository name matching the given registry. - client.tag( - image=image_name, repository=registry_image.repository, - tag=registry_image.tag, - ) - try: - client.push( - repository=registry_image.repository, - tag=registry_image.tag, - ) - finally: - # Remove the tag created above to make it possible to do the push. - client.remove_image(image=registry_image.full_name) - - return registry_image - - def run_registry(self): - """ - Start a registry in a container. - - The registry will be stopped and destroyed when the currently running - test finishes. - - :return: A ``Registry`` describing the registry which was started. - """ - registry_name = random_name(self) - registry_starting = self.start_container( - unit_name=registry_name, - image_name='registry:2', - ports=[ - PortMap( - internal_port=5000, - # Doesn't matter what port we expose this on. We'll - # discover what was assigned later. - external_port=0, - ), - ], - retry_on_port_collision=True, - ) - - def extract_listening_port(client): - listing = client.list() - - def listed(apps): - [app] = [app for app in apps if app.name == registry_name] - return next(iter(app.ports)).external_port - listing.addCallback(listed) - return listing - - registry_starting.addCallback(extract_listening_port) - - def wait_for_listening(external_port): - registry = Registry( - name=registry_name, port=external_port, - ) - registry_listening = self.request_until_response(registry.port) - registry_listening.addCallback(lambda ignored: registry) - return registry_listening - - registry_starting.addCallback(wait_for_listening) - - return registry_starting - - def _pull_timeout(self): - """ - Attempt to start an application using an image which must be pulled - from a registry but don't give the pull operation enough time to - complete. Assert that the result is a timeout error of some kind. - - :return: A ``Deferred`` firing with a two-tuple of a ``DockerImage`` - and a ``Registry``. The former represents the image we attempted - to use, the latter represents the registry we should have tried to - pull it from. - """ - client = dockerpy_client() - - # Run a local registry - running = self.run_registry() - - # Build a stub image - def build_dummy_image(registry): - path = FilePath(self.mktemp()) - path.makedirs() - path.child(b"Dockerfile.in").setContent( - b"FROM busybox\n" - b"CMD /bin/true\n" - ) - builder = DockerImageBuilder( - test=self, source_dir=path, - # We're going to manipulate the various tags on the image - # ourselves in this test. We'll do (the slightly more - # complicated) cleanup so the builder shouldn't (and will - # encounter errors if we let it). - cleanup=False, - ) - building = builder.build() - building.addCallback(lambda image_name: (image_name, registry)) - return building - running.addCallback(build_dummy_image) - - def cleanup_image(image_name): - for image in client.images(): - if image_name in image["RepoTags"]: - client.remove_image(image_name, force=True) - return - - def cleanup_registry(registry): - try: - client.unpause(self.namespacing_prefix + registry.name) - except APIError: - # Already unpaused - pass - - def setup_image((image_name, registry)): - registry_image = self.push_to_registry(image_name, registry) - - # The image shouldn't be downloaded during the run of this test. - # In case something goes wrong and it is downloaded, though, clean - # it up. - self.addCleanup(cleanup_image, image_name) - - # Pause the registry - client.pause(self.namespacing_prefix + registry.name) - - # Cannot stop paused containers to make sure it gets unpaused. - self.addCleanup(cleanup_registry, registry) - - # Create a DockerClient with a very short timeout - docker_client = DockerClient( - namespace=self.namespacing_prefix, long_timeout=1, - ) - # Add an application using the DockerClient, using the tag from the - # local registry - app_name = random_name(self) - d = docker_client.add(app_name, registry_image.full_name) - - # Assert that the timeout triggers. - # - # requests has a TimeoutError but timeout raises a ConnectionError. - # https://github.com/kennethreitz/requests/issues/2620 - # - # XXX DockerClient.add is our API. We could make it fail with a - # more coherent exception type if we wanted. - self.assertFailure(d, ReadTimeout) - d.addCallback(lambda ignored: (registry_image, registry)) - return d - running.addCallback(setup_image) - return running - - def test_pull_timeout(self): - """ - Pulling an image times-out if it takes longer than a provided timeout. - """ - return self._pull_timeout() - - def test_pull_timeout_pull(self): - """ - Image pull timeout does not affect subsequent pulls. - """ - # Note, this is the same image as test_pull_image_if_necessary, but - # they run at different times. Probably room for some refactoring to - # remove the duplication between them. - - # Run all of the code from test_pull_timeout - timing_out = self._pull_timeout() - - def pull_successfully((registry_image, registry)): - client = dockerpy_client() - # Resume the registry - client.unpause(self.namespacing_prefix + registry.name) - - # Create a DockerClient with the default timeout - docker_client = DockerClient(namespace=self.namespacing_prefix) - - # Add an application using the Client, using the tag from the local - # registry - app_name = random_name(self) - adding = docker_client.add(app_name, registry_image.full_name) - - # Assert that the application runs - return adding - timing_out.addCallback(pull_successfully) - return timing_out - - def test_namespacing(self): - """ - Containers are created with a namespace prefixed to their container - name. - """ - docker = dockerpy_client() - name = random_name(self) - client = self.make_client() - self.addCleanup(client.remove, name) - d = client.add(name, u"busybox:latest") - - def added(_): - self.assertTrue( - docker.inspect_container(self.namespacing_prefix + name)) - d.addCallback(added) - return d - - def test_null_environment(self): - """ - A container that does not include any environment variables contains - an empty ``environment`` in the return ``Unit``. - """ - docker_dir = FilePath(self.mktemp()) - docker_dir.makedirs() - docker_dir.child(b"Dockerfile").setContent( - b'FROM scratch\n' - b'MAINTAINER info@clusterhq.com\n' - b'CMD ["/bin/doesnotexist"]' - ) - name = random_name(self) - image = DockerImageBuilder(test=self, source_dir=docker_dir) - d = image.build() - - def image_built(image_name): - client = self.make_client() - self.create_container(client, name, image_name) - self.addCleanup(client.remove, name) - return client.list() - d.addCallback(image_built) - - def got_list(units): - unit = [unit for unit in units if unit.name == name][0] - self.assertIsNone(unit.environment) - d.addCallback(got_list) - return d - - def test_container_name(self): - """ - The container name stored on returned ``Unit`` instances matches the - expected container name. - """ - client = self.make_client() - name = random_name(self) - self.addCleanup(client.remove, name) - d = client.add(name, u"busybox:latest") - d.addCallback(lambda _: client.list()) - - def got_list(units): - unit = [unit for unit in units if unit.name == name][0] - self.assertEqual(unit.container_name, - self.namespacing_prefix + name) - d.addCallback(got_list) - return d - - def test_empty_environment(self): - """ - When a container with no custom environment variables is launched via - ``DockerClient.add`` the environment in the resulting ``Unit`` returned - from ``DockerClient.list`` will ignore the default HOME and PATH - environment variables, leaving the ``Unit`` with an Environment of - None. - """ - name = random_name(self) - d = self.start_container(name) - - def started(client): - deferred_units = client.list() - - def check_units(units): - unit = [unit for unit in units if unit.name == name][0] - self.assertIsNone(unit.environment) - - deferred_units.addCallback(check_units) - d.addCallback(started) - return d - - def test_list_only_custom_environment(self): - """ - When a container containing custom environment variables is launched - and the image used also injects environment variables, only the custom - variables we injected are returned by ``DockerClient.list``, whereas - variables set by the image are discarded. - - All Docker containers have a PATH environment variable. In addition, - the openshift/busybox-http-app image contains an STI_SCRIPTS_URL - environment variable. These are therefore disregarded the variables - disregarded in this test, whereas our custom environment is listed in - the returned Units. - - https://registry.hub.docker.com/u/openshift/busybox-http/dockerfile/ - """ - name = random_name(self) - environment = { - 'my_variable': 'some value', - 'another_variable': '12345' - } - environment = frozenset(environment.items()) - d = self.start_container( - name, - environment=Environment(variables=environment) - ) - - def started(client): - deferred_units = client.list() - - def check_units(units): - unit = [unit for unit in units if unit.name == name][0] - expected = Environment(variables=environment) - self.assertEqual(unit.environment, expected) - - deferred_units.addCallback(check_units) - - d.addCallback(started) - return d - - def test_add_with_volumes(self): - """ - ``DockerClient.add`` accepts a list of ``Volume`` instances which are - mounted within the container. - """ - docker_dir = FilePath(self.mktemp()) - docker_dir.makedirs() - docker_dir.child(b"Dockerfile").setContent( - b'FROM busybox\n' - b'CMD ["/bin/sh", "-c", ' - b'"touch /mnt1/a; touch /mnt2/b"]' - ) - image = DockerImageBuilder(test=self, source_dir=docker_dir) - d = image.build() - - def image_built(image_name): - unit_name = random_name(self) - - path1 = FilePath(self.mktemp()) - path1.makedirs() - path2 = FilePath(self.mktemp()) - path2.makedirs() - - d = self.start_container( - unit_name=unit_name, - image_name=image_name, - volumes=[ - Volume(node_path=path1, container_path=FilePath(b"/mnt1")), - Volume( - node_path=path2, container_path=FilePath(b"/mnt2"))], - expected_states=(u'inactive',), - ) - return d.addCallback(lambda _: (path1, path2)) - d.addCallback(image_built) - - def started((path1, path2)): - expected1 = path1.child(b"a") - expected2 = path2.child(b"b") - for _ in range(100): - if expected1.exists() and expected2.exists(): - return - else: - time.sleep(0.1) - self.fail("Files never created.") - return d.addCallback(started) - - def test_add_with_memory_limit(self): - """ - ``DockerClient.add`` accepts an integer mem_limit parameter which is - passed to Docker when creating a container as the maximum amount of RAM - available to that container. - """ - MEMORY_100MB = 100000000 - name = random_name(self) - d = self.start_container(name, mem_limit=MEMORY_100MB) - - def started(_): - docker = dockerpy_client() - data = docker.inspect_container(self.namespacing_prefix + name) - self.assertEqual(data[u"Config"][u"Memory"], - MEMORY_100MB) - d.addCallback(started) - return d - - def test_add_with_cpu_shares(self): - """ - ``DockerClient.add`` accepts an integer cpu_shares parameter which is - passed to Docker when creating a container as the CPU shares weight - for that container. This is a relative weight for CPU time versus other - containers and does not directly constrain CPU usage, i.e. a CPU share - constrained container can still use 100% CPU if other containers are - idle. Default shares when unspecified is 1024. - """ - name = random_name(self) - d = self.start_container(name, cpu_shares=512) - - def started(_): - docker = dockerpy_client() - data = docker.inspect_container(self.namespacing_prefix + name) - self.assertEqual(data[u"Config"][u"CpuShares"], 512) - d.addCallback(started) - return d - - def test_add_without_cpu_or_mem_limits(self): - """ - ``DockerClient.add`` when creating a container with no mem_limit or - cpu_shares specified will create a container without these resource - limits, returning integer 0 as the values for Memory and CpuShares from - its API when inspecting such a container. - """ - name = random_name(self) - d = self.start_container(name) - - def started(_): - docker = dockerpy_client() - data = docker.inspect_container(self.namespacing_prefix + name) - self.assertEqual(data[u"Config"][u"Memory"], 0) - self.assertEqual(data[u"Config"][u"CpuShares"], 0) - d.addCallback(started) - return d - - def start_restart_policy_container(self, mode, restart_policy): - """ - Start a container for testing restart policies. - - :param unicode mode: Mode of container. One of - - ``"failure"``: The container will always exit with a failure. - - ``"success-then-sleep"``: The container will exit with success - once, then sleep forever. - - ``"failure-then-sucess"``: The container will exit with failure - once, then with failure. - :param IRestartPolicy restart_policy: The restart policy to use for - the container. - - :returns Deferred: A deferred that fires with the number of times the - container was started. - """ - docker_dir = FilePath(__file__).sibling('retry-docker') - name = random_name(self) - data = FilePath(self.mktemp()) - data.makedirs() - count = data.child('count') - count.setContent("0") - marker = data.child('marker') - - image = DockerImageBuilder(test=self, source_dir=docker_dir) - d = image.build() - - def image_built(image_name): - if mode == u"success-then-sleep": - expected_states = (u'active',) - else: - expected_states = (u'inactive',) - - return self.start_container( - name, image_name=image_name, - restart_policy=restart_policy, - environment=Environment(variables={u'mode': mode}), - volumes=[ - Volume(node_path=data, container_path=FilePath(b"/data"))], - expected_states=expected_states) - d.addCallback(image_built) - - if mode == u"success-then-sleep": - # TODO: if the `run` script fails for any reason, - # then this will loop forever. - - d.addCallback(lambda ignored: loop_until(reactor, marker.exists)) - - d.addCallback(lambda ignored: count.getContent()) - return d - - def test_restart_policy_never(self): - """ - An container with a restart policy of never isn't restarted - after it exits. - """ - d = self.start_restart_policy_container( - mode=u"failure", restart_policy=RestartNever()) - - d.addCallback(self.assertEqual, "1") - return d - - @flaky(u'FLOC-2840') - def test_restart_policy_always(self): - """ - An container with a restart policy of always is restarted - after it exits. - """ - d = self.start_restart_policy_container( - mode=u"success-then-sleep", restart_policy=RestartAlways()) - - d.addCallback(self.assertEqual, "2") - return d - - @flaky([u'FLOC-3742', u'FLOC-3746']) - def test_restart_policy_on_failure(self): - """ - An container with a restart policy of on-failure is restarted - after it exits with a non-zero result. - """ - d = self.start_restart_policy_container( - mode=u"failure-then-success", restart_policy=RestartOnFailure()) - - d.addCallback(self.assertEqual, "2") - return d - - @flaky([u'FLOC-3742', u'FLOC-3746']) - def test_restart_policy_on_failure_maximum_count(self): - """ - A container with a restart policy of on-failure and a maximum - retry count is not restarted if it fails as many times than the - specified maximum. - """ - d = self.start_restart_policy_container( - mode=u"failure", - restart_policy=RestartOnFailure(maximum_retry_count=5)) - - # A Docker change e721ed9b5319e8e7c1daf87c34690f8a4e62c9e3 means that - # this value depends on the version of Docker. - d.addCallback(self.assertIn, ("5", "6")) - return d - - def test_command_line(self): - """ - A container with custom command line is run with those arguments. - """ - external_port = find_free_port()[1] - name = random_name(self) - d = self.start_container( - name, image_name=u"busybox", - # Pass in pvector since this likely to be what caller actually - # passes in: - command_line=pvector([u"sh", u"-c", u"""\ -echo -n '#!/bin/sh -echo -n "HTTP/1.1 200 OK\r\n\r\nhi" -' > /tmp/script.sh; -chmod +x /tmp/script.sh; -nc -ll -p 8080 -e /tmp/script.sh -"""]), - ports=[PortMap(internal_port=8080, - external_port=external_port)]) - - d.addCallback( - lambda ignored: self.request_until_response(external_port)) - - def started(response): - d = content(response) - d.addCallback(lambda body: self.assertEqual(b"hi", body)) - return d - d.addCallback(started) - return d - - -class MakeResponseTests(TestCase): - """ - Tests for ``make_response``. - """ - def test_str(self): - """ - ``str(make_response(...))`` returns a string giving the response code. - """ - self.assertEqual( - str(make_response(123, "Something")), - "<Response [123]>", - ) - - def test_apierror_str(self): - """ - A string representation can be constructed of an ``APIError`` - constructed with the response returned by ``make_response``. - """ - self.assertEqual( - str(APIError("", make_response(500, "Simulated server error"))), - "500 Server Error: Simulated server error", - ) - - -class DockerClientTests(AsyncTestCase): - """ - Tests for ``DockerClient`` specifically. - """ - @if_docker_configured - def setUp(self): - super(DockerClientTests, self).setUp() - - def test_default_namespace(self): - """ - The default namespace is `u"flocker--"`. - """ - docker = dockerpy_client() - name = random_name(self) - client = DockerClient() - self.addCleanup(client.remove, name) - d = client.add(name, u"busybox:latest") - d.addCallback(lambda _: self.assertTrue( - docker.inspect_container(u"flocker--" + name))) - return d - - def test_list_removed_containers(self): - """ - ``DockerClient.list`` does not list containers which are removed, - during its operation, from another thread. - """ - patcher = MonkeyPatcher() - - namespace = namespace_for_test(self) - flocker_docker_client = DockerClient(namespace=namespace) - - name1 = random_name(self) - adding_unit1 = flocker_docker_client.add(name1, ANY_IMAGE) - self.addCleanup(flocker_docker_client.remove, name1) - - name2 = random_name(self) - adding_unit2 = flocker_docker_client.add(name2, ANY_IMAGE) - self.addCleanup(flocker_docker_client.remove, name2) - - docker_client = flocker_docker_client._client - docker_client_containers = docker_client.containers - - def simulate_missing_containers(*args, **kwargs): - """ - Remove a container before returning the original list. - """ - containers = docker_client_containers(*args, **kwargs) - container_name1 = flocker_docker_client._to_container_name(name1) - docker_client.remove_container( - container=container_name1, force=True) - return containers - - adding_units = gatherResults([adding_unit1, adding_unit2]) - - def get_list(ignored): - patcher.addPatch( - docker_client, - 'containers', - simulate_missing_containers - ) - patcher.patch() - return flocker_docker_client.list() - - listing_units = adding_units.addCallback(get_list) - - def check_list(units): - patcher.restore() - self.assertEqual( - [name2], sorted([unit.name for unit in units]) - ) - running_assertions = listing_units.addCallback(check_list) - - return running_assertions - - def error_passthrough_test(self, method_name): - """ - If the given method name on the underyling ``Docker`` client has a - non-404 error, that gets passed through to ``Docker.list()``. - - :param str method_name: Method of a docker ``Client``. - :return: ``Deferred`` firing on test success. - """ - name = random_name(self) - client = DockerClient() - self.addCleanup(client.remove, name) - d = client.add(name, u"busybox:latest") - - response = make_response(500, "Simulated error") - - def error(name): - raise APIError("", response) - - def added(_): - # Monekypatch cause triggering non-404 errors from - # inspect_container is hard. - self.patch(client._client, method_name, error) - return client.list() - d.addCallback(added) - return self.assertFailure(d, APIError) - - def test_list_error_inspecting_container(self): - """ - If an error occurs inspecting a container it is passed through. - """ - return self.error_passthrough_test("inspect_container") - - def test_list_error_inspecting_image(self): - """ - If an error occurs inspecting an image it is passed through. - """ - return self.error_passthrough_test("inspect_image") - - -class NamespacedDockerClientTests(GenericDockerClientTests): - """ - Functional tests for ``NamespacedDockerClient``. - """ - @if_docker_configured - def setUp(self): - super(NamespacedDockerClientTests, self).setUp() - self.namespace = namespace_for_test(self) - self.namespacing_prefix = BASE_NAMESPACE + self.namespace + u"--" - - def make_client(self): - return NamespacedDockerClient(self.namespace) - - def create_container(self, client, name, image): - """ - Create (but don't start) a container via the supplied client. - - :param DockerClient client: The Docker API client. - :param unicode name: The container name. - :param unicode image: The image name. - """ - container_name = client._client._to_container_name(name) - client._client._client.create_container( - name=container_name, image=image) - - def test_isolated_namespaces(self): - """ - Containers in one namespace are not visible in another namespace. - """ - client = NamespacedDockerClient(namespace=namespace_for_test(self)) - client2 = NamespacedDockerClient(namespace=namespace_for_test(self)) - name = random_name(self) - - self.addCleanup(client.remove, name) - d = client.add(name, u"busybox:latest") - d.addCallback(lambda _: client2.list()) - d.addCallback(self.assertEqual, set()) - return d diff --git a/flocker/node/test/test_docker.py b/flocker/node/test/test_docker.py deleted file mode 100644 index ee7aebe1e4..0000000000 --- a/flocker/node/test/test_docker.py +++ /dev/null @@ -1,500 +0,0 @@ -# Copyright ClusterHQ Inc. See LICENSE file for details. - -""" -Tests for :module:`flocker.node._docker`. -""" - -from zope.interface.verify import verifyObject - -from pyrsistent import pset, pvector - -from docker.errors import APIError - -from twisted.python.filepath import FilePath - -from ...testtools import ( - AsyncTestCase, TestCase, random_name, make_with_init_tests, -) -from ..testtools import add_with_port_collision_retry - -from .._docker import ( - IDockerClient, FakeDockerClient, AddressInUse, AlreadyExists, PortMap, - Unit, Environment, Volume, -) - -from ...control._model import RestartAlways, RestartNever, RestartOnFailure - -# Just some image we can use to start a container. No particularly behavior -# should be expected from this image except that it exists. -# -# Note we explicitly select the "latest" tag to avoid tripping over a Docker -# 1.8.1 / Docker hub interaction that results in pulls failing. See -# https://github.com/docker/docker/issues/15699 -ANY_IMAGE = u"openshift/busybox-http-app:latest" - - -def make_idockerclient_tests(fixture): - """ - Create a TestCase for IDockerClient. - - :param fixture: A fixture that returns a :class:`IDockerClient` - provider. - """ - class IDockerClientTests(AsyncTestCase): - """ - Tests for :class:`IDockerClientTests`. - - These are functional tests if run against a real Docker daemon. - """ - def test_interface(self): - """The tested object provides :class:`IDockerClient`.""" - client = fixture(self) - self.assertTrue(verifyObject(IDockerClient, client)) - - def test_add_and_remove(self): - """ - An added container can be removed without an error. - """ - client = fixture(self) - name = random_name(self) - d = client.add(name, u"busybox") - d.addCallback(lambda _: client.remove(name)) - return d - - def test_no_double_add(self): - """ - Adding a container with name that already exists results in error. - """ - client = fixture(self) - name = random_name(self) - self.addCleanup(client.remove, name) - d = client.add(name, u"busybox") - - def added(_): - return client.add(name, u"busybox") - d.addCallback(added) - d = self.assertFailure(d, AlreadyExists) - d.addCallback(lambda exc: self.assertEqual(exc.args[0], name)) - return d - - def test_remove_nonexistent_is_ok(self): - """ - Removing a non-existent container does not result in a error. - """ - client = fixture(self) - name = random_name(self) - return client.remove(name) - - def test_double_remove_is_ok(self): - """ - Removing a container twice in a row does not result in error. - """ - client = fixture(self) - name = random_name(self) - d = client.add(name, u"busybox") - d.addCallback(lambda _: client.remove(name)) - d.addCallback(lambda _: client.remove(name)) - return d - - def test_unknown_does_not_exist(self): - """ - A container that was never added does not exist. - """ - client = fixture(self) - name = random_name(self) - d = client.exists(name) - d.addCallback(self.assertFalse) - return d - - def test_added_exists(self): - """ - An added container exists. - """ - client = fixture(self) - name = random_name(self) - self.addCleanup(client.remove, name) - d = client.add(name, u"busybox") - - def added(_): - return client.exists(name) - d.addCallback(added) - d.addCallback(self.assertTrue) - return d - - def test_removed_does_not_exist(self): - """ - A removed container does not exist. - """ - client = fixture(self) - name = random_name(self) - d = client.add(name, ANY_IMAGE) - d.addCallback(lambda _: client.remove(name)) - d.addCallback(lambda _: client.exists(name)) - d.addCallback(self.assertFalse) - return d - - def test_zero_port_randomly_assigned(self): - """ - If an external port number is given as 0, a random available port - number is used. - """ - client = fixture(self) - name = random_name(self) - portmap = PortMap( - internal_port=1234, external_port=0, - ) - self.addCleanup(client.remove, name) - d = client.add(name, ANY_IMAGE, ports=(portmap,)) - d.addCallback(lambda ignored: client.list()) - - def check_port(units): - portmap = list(list(units)[0].ports)[0] - self.assertTrue( - 0 < portmap.external_port < 2 ** 16, - "Unexpected automatic port assignment: {}".format( - portmap.external_port - ), - ) - d.addCallback(check_port) - return d - - def test_port_collision_raises_addressinuse(self): - """ - If the container is configured with an external port number which - is already in use, ``AddressInUse`` is raised. - """ - client = fixture(self) - name = random_name(self) - portmap = PortMap( - internal_port=12345, external_port=0, - ) - self.addCleanup(client.remove, name) - d = client.add(name, ANY_IMAGE, ports=(portmap,)) - d.addCallback(lambda ignored: client.list()) - - def extract_port(units): - return list(list(units)[0].ports)[0].external_port - d.addCallback(extract_port) - - def collide(external_port): - self.external_port = external_port - portmap = PortMap( - internal_port=54321, external_port=external_port, - ) - name = random_name(self) - self.addCleanup(client.remove, name) - return client.add(name, ANY_IMAGE, ports=(portmap,)) - d.addCallback(collide) - d = self.assertFailure(d, AddressInUse) - - def failed(exception): - self.assertEqual( - exception.address, (b"0.0.0.0", self.external_port) - ) - self.assertIsInstance(exception.apierror, APIError) - d.addCallback(failed) - return d - - def test_added_is_listed(self): - """ - An added container is included in the output of ``list()``. - """ - client = fixture(self) - name = random_name(self) - image = ANY_IMAGE - - portmaps = [ - PortMap(internal_port=80, external_port=0), - PortMap(internal_port=5432, external_port=0), - ] - volumes = ( - Volume(node_path=FilePath(self.mktemp()), - container_path=FilePath(b'/var/lib/data')), - ) - environment = ( - (u'CUSTOM_ENV_A', u'a value'), - (u'CUSTOM_ENV_B', u'another value'), - ) - environment = Environment(variables=frozenset(environment)) - self.addCleanup(client.remove, name) - - d = add_with_port_collision_retry( - client, - name, - image_name=image, - ports=portmaps, - volumes=volumes, - environment=environment, - mem_limit=100000000, - cpu_shares=512, - restart_policy=RestartAlways(), - ) - - def added((app, portmaps)): - d = client.list() - d.addCallback(lambda units: (units, portmaps)) - return d - d.addCallback(added) - - def got_list((units, portmaps)): - result = units.pop() - - expected = Unit( - name=name, container_name=name, activation_state=u"active", - container_image=image, ports=frozenset(portmaps), - environment=environment, volumes=frozenset(volumes), - mem_limit=100000000, cpu_shares=512, - restart_policy=RestartAlways(), - ) - - # This test is not concerned with a returned ``Unit``'s - # ``container_name`` and unlike other properties of the - # result, does not expect ``container_name`` to be any - # particular value. Manually setting it below to a fixed - # known value simply allows us to compare an entire Unit - # object instead of individual properties and is therefore - # a convenience measure. - result = result.set("container_name", name) - self.assertEqual(result, expected) - d.addCallback(got_list) - return d - - def test_removed_is_not_listed(self): - """ - A removed container is not included in the output of ``list()``. - """ - client = fixture(self) - name = random_name(self) - - d = client.add(name, ANY_IMAGE) - d.addCallback(lambda _: client.remove(name)) - d.addCallback(lambda _: client.list()) - - def got_list(units): - self.assertNotIn(name, [unit.name for unit in units]) - d.addCallback(got_list) - return d - - def test_container_name(self): - """ - Each container also records the container name twice. - """ - # This is silly behavior. Get rid of it when fixing - # <https://clusterhq.atlassian.net/browse/FLOC-819>. - client = fixture(self) - name = random_name(self) - self.addCleanup(client.remove, name) - d = client.add(name, u"busybox") - d.addCallback(lambda _: client.list()) - - def got_list(units): - unit = [unit for unit in units if unit.name == name][0] - self.assertIsInstance(unit.container_name, unicode) - d.addCallback(got_list) - return d - - def test_command_line(self): - """ - Containers created with a command-line have a command-line included - when listed. - """ - client = fixture(self) - name = random_name(self) - self.addCleanup(client.remove, name) - command_line = [u"nc", u"-l", u"-p", u"1234"] - d = client.add(name, u"busybox", command_line=command_line) - d.addCallback(lambda _: client.list()) - - def got_list(units): - unit = [unit for unit in units if unit.name == name][0] - self.assertEqual(unit.command_line, pvector(command_line)) - d.addCallback(got_list) - return d - - def assert_restart_policy_round_trips(self, restart_policy): - """ - Creating a container with the given restart policy creates a - container that reports that same policy. - - :param IRestartPolicy restart_policy: The restart policy to test. - """ - client = fixture(self) - name = random_name(self) - self.addCleanup(client.remove, name) - d = client.add(name, u"busybox", restart_policy=restart_policy) - d.addCallback(lambda _: client.list()) - - def got_list(units): - unit = [unit for unit in units if unit.name == name][0] - self.assertEqual(unit.restart_policy, restart_policy) - d.addCallback(got_list) - return d - - def test_add_with_restart_never(self): - """ - ``DockerClient.add`` when creating a container with a restart - policy, of never will create a container with this policy. - """ - return self.assert_restart_policy_round_trips(RestartNever()) - - def test_add_with_restart_always(self): - """ - ``DockerClient.add`` when creating a container with a restart - policy, of always will create a container with this policy. - """ - return self.assert_restart_policy_round_trips(RestartAlways()) - - def test_add_with_restart_on_failure(self): - """ - ``DockerClient.add`` when creating a container with a restart - policy, of on failure will create a container with this policy. - """ - return self.assert_restart_policy_round_trips(RestartOnFailure()) - - def test_add_with_restart_on_failure_with_maximum_retry(self): - """ - ``DockerClient.add`` when creating a container with a restart - policy, of on failure with a retry count will create a container - with this policy. - """ - return self.assert_restart_policy_round_trips( - RestartOnFailure(maximum_retry_count=5)) - - return IDockerClientTests - - -class FakeIDockerClientTests( - make_idockerclient_tests( - fixture=lambda test_case: FakeDockerClient(), - ) -): - """ - ``IDockerClient`` tests for ``FakeDockerClient``. - """ - - -class FakeDockerClientImplementationTests(TestCase): - """ - Tests for implementation details of ``FakeDockerClient``. - """ - def test_units_default(self): - """ - ``FakeDockerClient._units`` is an empty dict by default. - """ - self.assertEqual({}, FakeDockerClient()._units) - - def test_units_override(self): - """ - ``FakeDockerClient._units`` can be supplied in the constructor. - """ - units = {u'foo': Unit(name=u'foo', container_name=u'foo', - activation_state=u'active', - container_image=u'flocker/flocker:v1.0.0')} - self.assertEqual(units, FakeDockerClient(units=units)._units) - - -class PortMapInitTests( - make_with_init_tests( - record_type=PortMap, - kwargs=dict( - internal_port=5678, - external_port=910, - ) - ) -): - """ - Tests for ``PortMap.__init__``. - """ - - -class PortMapTests(TestCase): - """ - Tests for ``PortMap``. - - XXX: The equality tests in this case are incomplete. See - https://github.com/hynek/characteristic/issues/4 for a proposed solution to - this. - """ - def test_equal(self): - """ - ``PortMap`` instances with the same internal and external ports compare - equal. - """ - self.assertEqual( - PortMap(internal_port=5678, external_port=910), - PortMap(internal_port=5678, external_port=910), - ) - - def test_not_equal(self): - """ - ``PortMap`` instances with the different internal and external ports do - not compare equal. - """ - self.assertNotEqual( - PortMap(internal_port=5678, external_port=910), - PortMap(internal_port=1516, external_port=1718) - ) - - -class UnitInitTests( - make_with_init_tests( - record_type=Unit, - kwargs=dict( - name=u'site-example.com', - container_name=u'flocker--site-example.com', - activation_state=u'active', - container_image=u'flocker/flocker:v1.0.0', - ports=pset((PortMap(internal_port=80, external_port=8080),)), - environment=Environment(variables={u'foo': u'bar'}), - restart_policy=RestartAlways(), - ), - expected_defaults=dict( - ports=pset(), container_image=None, environment=None, - restart_policy=RestartNever()) - ) -): - """ - Tests for ``Unit.__init__``. - """ - - -class EnvironmentInitTests( - make_with_init_tests( - record_type=Environment, - kwargs=dict( - variables=dict(foo="bar"), - ), - ) -): - """ - Tests for ``Environment.__init__``. - """ - - -class EnvironmentTests(TestCase): - """ - Tests for ``Environment``. - """ - def test_to_dict(self): - """ - ``Environment.to_dict`` returns a dictionary containing the - the environment variables as key/value entries. - """ - variables = {'baz': 'qux', 'foo': 'bar'} - environment = Environment(variables=frozenset(variables.items())) - - self.assertEqual(environment.to_dict(), variables) - - -class VolumeInitTests( - make_with_init_tests( - record_type=Volume, - kwargs=dict( - node_path=FilePath(b"/tmp"), - container_path=FilePath(b"/blah"), - ), - ) -): - """ - Tests for ``Volume.__init__``. - """ diff --git a/flocker/node/testtools.py b/flocker/node/testtools.py index ed9534425a..11fbbd3938 100644 --- a/flocker/node/testtools.py +++ b/flocker/node/testtools.py @@ -4,14 +4,7 @@ Testing utilities for ``flocker.node``. """ -from functools import wraps -import os -import pwd -from unittest import skipIf, SkipTest from uuid import uuid4 -from distutils.version import LooseVersion # pylint: disable=import-error - -import psutil from zope.interface import implementer @@ -21,80 +14,18 @@ from zope.interface.verify import verifyObject -from eliot import Logger, ActionType, MessageType, fields +from eliot import Logger, ActionType from . import ( ILocalState, IDeployer, NodeLocalState, IStateChange, sequentially ) from ..common import loop_until -from ..testtools import AsyncTestCase, find_free_port +from ..testtools import AsyncTestCase from ..control import ( IClusterStateChange, Node, NodeState, Deployment, DeploymentState, PersistentState, ) from ..control._model import ip_to_uuid, Leases -from ._docker import AddressInUse, DockerClient - - -def docker_accessible(): - """ - Attempt to connect to the Docker control socket. - - :return: A ``bytes`` string describing the reason Docker is not - accessible or ``None`` if it appears to be accessible. - """ - try: - client = DockerClient() - client._client.ping() - except Exception as e: - return str(e) - return None - -_docker_reason = docker_accessible() - -if_docker_configured = skipIf( - _docker_reason, - "User {!r} cannot access Docker: {}".format( - pwd.getpwuid(os.geteuid()).pw_name, - _docker_reason, - )) - - -def require_docker_version(minimum_docker_version, message): - """ - Skip the wrapped test if the actual Docker version is less than - ``minimum_docker_version``. - - :param str minimum_docker_version: The minimum version required by the - test. - :param str message: An explanatory message which will be printed when - skipping the test. - """ - minimum_docker_version = LooseVersion( - minimum_docker_version - ) - - # XXX: Can we change this to use skipIf? - def decorator(wrapped): - @wraps(wrapped) - def wrapper(*args, **kwargs): - client = DockerClient() - docker_version = LooseVersion( - client._client.version()['Version'] - ) - if docker_version < minimum_docker_version: - raise SkipTest( - 'Minimum required Docker version: {}. ' - 'Actual Docker version: {}. ' - 'Details: {}'.format( - minimum_docker_version, - docker_version, - message, - ) - ) - return wrapped(*args, **kwargs) - return wrapper - return decorator def wait_for_unit_state(reactor, docker_client, unit_name, @@ -402,94 +333,3 @@ def assert_calculated_changes_for_deployer( cluster_configuration, cluster_state, local_state ) case.assertEqual(expected_changes, changes) - - -ADDRESS_IN_USE = MessageType( - u"flocker:test:address_in_use", - fields(ip=unicode, port=int, name=bytes), -) - - -def _find_process_name(port_number): - """ - Get the name of the process using the given port number. - """ - for connection in psutil.net_connections(): - if connection.laddr[1] == port_number: - return psutil.Process(connection.pid).name() - return None - - -def _retry_on_port_collision(reason, add, cleanup): - """ - Cleanup and re-add a container if it failed to start because of a port - collision. - - :param reason: The exception describing the container startup failure. - :param add: A no-argument callable that can be used to try adding and - starting the container again. - :param cleanup: A no-argument callable that can be used to remove the - container. - """ - # We select a random, available port number on each attempt. If it was in - # use it's because the "available" part of that port number selection logic - # is fairly shaky. It should be good enough that trying again works fairly - # well, though. So do that. - reason.trap(AddressInUse) - ip, port = reason.value.address - used_by = _find_process_name(port) - ADDRESS_IN_USE(ip=ip, port=port, name=used_by).write() - d = cleanup() - d.addCallback(lambda ignored: add()) - return d - - -def add_with_port_collision_retry(client, unit_name, **kw): - """ - Add a container. Try adding it repeatedly if it has ports defined and - container startup fails with ``AddressInUse``. - - If ports in the container are defined with an external port number of ``0`` - a locally free port number will be assigned. On each re-try attempt, these - will be re-assigned to try to avoid the port collision. - - :param DockerClient client: The ``IDockerClient`` to use to try to add the - container. - :param unicode unit_name: The name of the container to add. See the - ``unit_name`` parameter of ``IDockerClient.add``. - :param kw: Additional keyword arguments to pass on to - ``IDockerClient.add``. - - :return: A ``Deferred`` which fires with a two-tuple. The first element - represents the container which has been added and started. The second - element is a ``list`` of ``PortMap`` instances describing the ports - which were ultimately requested. - """ - ultimate_ports = [] - - def add(): - # Generate a replacement for any auto-assigned ports - ultimate_ports[:] = tentative_ports = list( - port.set( - external_port=find_free_port()[1] - ) - if port.external_port == 0 - else port - for port in kw["ports"] - ) - tentative_kw = kw.copy() - tentative_kw["ports"] = tentative_ports - return client.add(unit_name, **tentative_kw) - - def cleanup(): - return client.remove(unit_name) - - if "ports" in kw: - trying = add() - trying.addErrback(_retry_on_port_collision, add, cleanup) - result = trying - else: - result = client.add(unit_name, **kw) - - result.addCallback(lambda app: (app, ultimate_ports)) - return result From 220181e4713f474ef6bafcb257b98286b71e137a Mon Sep 17 00:00:00 2001 From: Richard Wall <richard.wall@clusterhq.com> Date: Tue, 4 Oct 2016 14:36:15 +0100 Subject: [PATCH 2/2] Remove the jenkins jobs for the deleted tests --- build.yaml | 45 --------------------------------------------- 1 file changed, 45 deletions(-) diff --git a/build.yaml b/build.yaml index 2e1d7f2814..a3738db134 100644 --- a/build.yaml +++ b/build.yaml @@ -869,51 +869,6 @@ job_type: timeout: 30 directories_to_delete: *run_trial_directories_to_delete - run_trial_on_AWS_CentOS_7_flocker.node.functional.test_docker: - # FLOC-3903: docker on centos use loop-devmapper - # by default. That makes it much slower than Ubuntu - # with aufs. It leads to timeouts, but seems to do - # a bit better on the medium instance - on_nodes_with_labels: 'aws-centos-7-SELinux-T2Medium' - module: flocker.node.functional.test_docker - with_steps: - - { type: 'shell', cli: *run_trial_cli } - archive_artifacts: *flocker_artifacts - publish_test_results: true - coverage_report: true - clean_repo: true - # Increase the timeout due to FLOC-3903 - timeout: 45 - directories_to_delete: *run_trial_directories_to_delete - - # Split out just to do the CentOS version above, - # for the reasons outlined in that section - run_trial_on_AWS_Ubuntu_Trusty_flocker.node.functional.test_docker: - on_nodes_with_labels: 'aws-ubuntu-trusty-T2Medium' - module: flocker.node.functional.test_docker - with_steps: - - { type: 'shell', cli: *run_trial_cli } - archive_artifacts: *flocker_artifacts - publish_test_results: true - coverage_report: true - clean_repo: true - timeout: 30 - directories_to_delete: *run_trial_directories_to_delete - - # Split out just to do the CentOS version above, - # for the reasons outlined in that section - run_trial_on_AWS_Ubuntu_Xenial_flocker.node.functional.test_docker: - on_nodes_with_labels: 'aws-ubuntu-xenial-T2Medium' - module: flocker.node.functional.test_docker - with_steps: - - { type: 'shell', cli: *run_trial_cli } - archive_artifacts: *flocker_artifacts - publish_test_results: true - coverage_report: true - clean_repo: true - timeout: 30 - directories_to_delete: *run_trial_directories_to_delete - run_trial_for_storage_driver: run_trial_for_ebs_storage_driver_on_CentOS_7: on_nodes_with_labels: 'aws-centos-7-SELinux-T2Medium'