diff --git a/admin/acceptance.py b/admin/acceptance.py index a033267584..ae8bb2d8a5 100644 --- a/admin/acceptance.py +++ b/admin/acceptance.py @@ -103,11 +103,6 @@ def get_trial_environment(cluster): 'FLOCKER_ACCEPTANCE_VOLUME_BACKEND': cluster.dataset_backend.name, 'FLOCKER_ACCEPTANCE_API_CERTIFICATES_PATH': cluster.certificates_path.path, - 'FLOCKER_ACCEPTANCE_HOSTNAME_TO_PUBLIC_ADDRESS': json.dumps({ - node.private_address: node.address - for node in cluster.agent_nodes - if node.private_address is not None - }), 'FLOCKER_ACCEPTANCE_DEFAULT_VOLUME_SIZE': bytes( cluster.default_volume_size ), diff --git a/flocker/acceptance/endtoend/test_installer.py b/flocker/acceptance/endtoend/test_installer.py index 60f2bf38ea..92b49e4995 100644 --- a/flocker/acceptance/endtoend/test_installer.py +++ b/flocker/acceptance/endtoend/test_installer.py @@ -326,7 +326,6 @@ def _cleanup_flocker(self): control_node=self.control_node_ip.encode('ascii'), certificates_path=local_certs_path, num_agent_nodes=2, - hostname_to_public_address={}, username='user1', ) d.addCallback( diff --git a/flocker/acceptance/testtools.py b/flocker/acceptance/testtools.py index 3037371d32..84a943ff96 100644 --- a/flocker/acceptance/testtools.py +++ b/flocker/acceptance/testtools.py @@ -18,6 +18,8 @@ from docker.tls import TLSConfig +from ipaddress import ip_address + from twisted.web.http import OK, CREATED from twisted.python.filepath import FilePath from twisted.python.constants import Names, NamedConstant @@ -45,6 +47,7 @@ from ..apiclient import FlockerClient, DatasetState from ..node.script import get_backend, get_api from ..node import dockerpy_client +from ..node.agents.blockdevice import ICloudAPI from .node_scripts import SCRIPTS as NODE_SCRIPTS @@ -827,7 +830,7 @@ def get_file(self, node, path): def connected_cluster( reactor, control_node, certificates_path, num_agent_nodes, - hostname_to_public_address, username='user', + username='user', ): cluster_cert = certificates_path.child(b"cluster.crt") user_cert = certificates_path.child( @@ -870,24 +873,60 @@ def failed_query(failure): failed_query) return d agents_connected = loop_until(reactor, nodes_available) + agents_connected.addCallback(lambda _: _add_nodes(cluster)) + return agents_connected + + +def _add_nodes(cluster): + """ + Configure the ``Node`` objects for a newly created ``Cluster`` whose + nodes are known to be alive. + + :param Cluster cluster: Cluster that still needs nodes set. + :return: ``cluster`` updated with appropriate ``nodes`` set. + """ + # By default we just trust address returned by Flocker + def default_get_public_ip(address): + return address - # Extract node hostnames from API that lists nodes. Currently we - # happen know these in advance, but in FLOC-1631 node identification - # will switch to UUIDs instead. - agents_connected.addCallback(lambda _: cluster.current_nodes()) + try: + backend = get_backend_api(None, cluster.cluster_uuid) + except SkipTest as e: + # Can't load backend, will have to trust Flocker's reported IPs. + print "Can't use backend", e + get_public_ip = default_get_public_ip + else: + if ICloudAPI.providedBy(backend): + node_ips = list(set(ip_address(i) for i in instance.ip_addresses) + for instance in backend.list_live_nodes()) + + def get_public_ip(address): + for ips in node_ips: + if ip_address(address) in ips: + return list( + unicode(ip) for ip in ips + if not any([ip.is_private, ip.is_link_local, + ip.is_loopback]))[0] + raise ValueError( + "Couldn't find address in cloud API reported IPs") + else: + print "Backend doesn't provide ICloudAPI", backend + get_public_ip = default_get_public_ip def node_from_dict(node): - reported_hostname = node["host"] - public_address = hostname_to_public_address.get( - reported_hostname, reported_hostname) + reported_ip = node["host"] + public_address = get_public_ip(reported_ip) + print reported_ip, get_public_ip, public_address return Node( uuid=node[u"uuid"], public_address=public_address.encode("ascii"), - reported_hostname=reported_hostname.encode("ascii"), + reported_hostname=reported_ip.encode("ascii"), ) - agents_connected.addCallback(lambda nodes: cluster.set( - "nodes", map(node_from_dict, nodes))) - return agents_connected + + d = cluster.current_nodes() + d.addCallback( + lambda nodes: cluster.set("nodes", map(node_from_dict, nodes))) + return d def _get_test_cluster(reactor): @@ -915,16 +954,11 @@ def _get_test_cluster(reactor): certificates_path = FilePath( environ["FLOCKER_ACCEPTANCE_API_CERTIFICATES_PATH"]) - hostname_to_public_address_env_var = environ.get( - "FLOCKER_ACCEPTANCE_HOSTNAME_TO_PUBLIC_ADDRESS", "{}") - hostname_to_public_address = json.loads(hostname_to_public_address_env_var) - return connected_cluster( reactor, control_node, certificates_path, num_agent_nodes, - hostname_to_public_address ) diff --git a/flocker/node/agents/blockdevice.py b/flocker/node/agents/blockdevice.py index 02817fda1e..546596eb56 100644 --- a/flocker/node/agents/blockdevice.py +++ b/flocker/node/agents/blockdevice.py @@ -1108,6 +1108,14 @@ def get_device_path(blockdevice_id): """ +class CloudComputeInstance(PClass): + """ + A compute instance in a cloud. + """ + node_id = field(type=unicode) + ip_addresses = pset_field(unicode) + + class ICloudAPI(Interface): """ Additional functionality provided specifically by cloud-based block @@ -1118,6 +1126,13 @@ class ICloudAPI(Interface): This is specifically designed for cloud systems where shut down nodes continue to have volumes attached to them. + + This API is not very well designed, so probably should not be + implemented by third party providers until we do some cleanup. Worth + noting that ``list_live_nodes`` could also be provided by e.g. Swarm + or K8s-specific backend, which suggests ``compute_instance_id`` and + ``list_live_nodes`` should be on a different object than + ``IBlockDeviceAPI``, an object that is loaded by all agents. """ def list_live_nodes(): """ @@ -1126,8 +1141,7 @@ def list_live_nodes(): This is used to figure out which nodes are dead, so that other nodes can do the detach. - :returns: A collection of ``unicode`` compute instance IDs, compatible - with those returned by ``IBlockDeviceAPI.compute_instance_id``. + :returns: A list of ``CloudComputeInstance``. """ def start_node(node_id): @@ -1677,7 +1691,9 @@ def is_existing_block_device(dataset_id, path): pass if ICloudAPI.providedBy(self._underlying_blockdevice_api): - live_instances = self._underlying_blockdevice_api.list_live_nodes() + live_instances = list( + instance.node_id for instance in + self._underlying_blockdevice_api.list_live_nodes()) else: # Can't know accurately who is alive and who is dead: live_instances = None diff --git a/flocker/node/agents/cinder.py b/flocker/node/agents/cinder.py index 4271e00a4c..b0790d810e 100644 --- a/flocker/node/agents/cinder.py +++ b/flocker/node/agents/cinder.py @@ -38,6 +38,7 @@ from .blockdevice import ( IBlockDeviceAPI, BlockDeviceVolume, UnknownVolume, AlreadyAttachedVolume, UnattachedVolume, UnknownInstanceID, get_blockdevice_volume, ICloudAPI, + CloudComputeInstance, ) from ._logging import ( NOVA_CLIENT_EXCEPTION, KEYSTONE_HTTP_ERROR, COMPUTE_INSTANCE_ID_NOT_FOUND, @@ -689,8 +690,15 @@ def get_device_path(self, blockdevice_id): # ICloudAPI: def list_live_nodes(self): - return list(server.id for server in self.nova_server_manager.list() - if server.status == u'ACTIVE') + return [ + CloudComputeInstance( + node_id=server.id, + ip_addresses=map( + unicode, + _extract_nova_server_addresses(server.addresses))) + for server in self.nova_server_manager.list() + if server.status == u'ACTIVE' + ] def start_node(self, node_id): server = self.nova_server_manager.get(node_id) diff --git a/flocker/node/agents/ebs.py b/flocker/node/agents/ebs.py index 7bcee7e0ba..98fac70f32 100644 --- a/flocker/node/agents/ebs.py +++ b/flocker/node/agents/ebs.py @@ -40,7 +40,7 @@ from .blockdevice import ( IBlockDeviceAPI, IProfiledBlockDeviceAPI, BlockDeviceVolume, UnknownVolume, AlreadyAttachedVolume, UnattachedVolume, UnknownInstanceID, - MandatoryProfiles, ICloudAPI, + MandatoryProfiles, ICloudAPI, CloudComputeInstance, ) from flocker.common import poll_until @@ -1405,7 +1405,13 @@ def get_device_path(self, blockdevice_id): def list_live_nodes(self): instances = self.connection.instances.filter( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]) - return list(unicode(instance.id) for instance in instances) + return [ + CloudComputeInstance( + node_id=unicode(instance.id), + ip_addresses=[unicode(instance.public_ip_address), + unicode(instance.private_ip_address)]) + for instance in instances + ] @boto3_log def start_node(self, node_id): diff --git a/flocker/node/agents/test/test_blockdevice.py b/flocker/node/agents/test/test_blockdevice.py index 117793bc63..be441b3cac 100644 --- a/flocker/node/agents/test/test_blockdevice.py +++ b/flocker/node/agents/test/test_blockdevice.py @@ -87,7 +87,7 @@ ICloudAPI, _SyncToThreadedAsyncCloudAPIAdapter, - + CloudComputeInstance, log_list_volumes, CALL_LIST_VOLUMES, ) @@ -99,7 +99,7 @@ _backing_file_name, ) from ....common.algebraic import tagged_union_strategy - +from ....common import get_all_ips from ... import run_state_change, in_parallel, ILocalState, IStateChange, NoOp from ...testtools import ( @@ -755,7 +755,14 @@ def __init__(self, block_api, live_nodes=()): self.live_nodes = live_nodes def list_live_nodes(self): - return [self.compute_instance_id()] + list(self.live_nodes) + result = [CloudComputeInstance( + node_id=self.compute_instance_id(), + ip_addresses=( + unicode(i) for i in get_all_ips() if i != b"127.0.0.1"))] + for i, node in enumerate(self.live_nodes): + result.append(CloudComputeInstance( + node_id=node, ip_addresses=[u"10.1.1.{}".format(i)])) + return result def start_node(self, node_id): return @@ -5495,19 +5502,28 @@ def test_interface(self): def test_current_machine_is_live(self): """ - The machine running the test is reported as alive. + The machine running the test is reported as alive and has expected + IP addresses. """ + local_addresses = set(unicode(i) for i in get_all_ips() + if i != b"127.0.0.1") + local_id = self.api.compute_instance_id() d = self.async_cloud_api.list_live_nodes() - d.addCallback(lambda live: - self.assertIn(self.api.compute_instance_id(), live)) + + def got_compute_instances(instances): + [instance] = (i for i in instances if i.node_id == local_id) + self.assertTrue(instance.ip_addresses.intersection( + local_addresses)) + d.addCallback(got_compute_instances) return d def test_list_live_nodes(self): """ ``list_live_nodes`` returns an iterable of unicode values. """ - live_nodes = self.api.list_live_nodes() - self.assertThat(live_nodes, AllMatch(IsInstance(unicode))) + live_nodes = list(self.api.list_live_nodes()) + self.assertThat(live_nodes, + AllMatch(IsInstance(CloudComputeInstance))) return Tests