diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 806e46cc85f4..e17fe41fd708 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -6,7 +6,7 @@ import json import logging import re -from collections import OrderedDict +from collections import Counter, OrderedDict from datetime import timedelta from uuid import uuid4 @@ -82,6 +82,7 @@ Project, ProjectUpdate, ProjectUpdateEvent, + ReceptorAddress, RefreshToken, Role, Schedule, @@ -636,7 +637,7 @@ def validate(self, attrs): exclusions = self.get_validation_exclusions(self.instance) obj = self.instance or self.Meta.model() for k, v in attrs.items(): - if k not in exclusions: + if k not in exclusions and k != 'canonical_address_port': setattr(obj, k, v) obj.full_clean(exclude=exclusions) # full_clean may modify values on the instance; copy those changes @@ -5458,17 +5459,25 @@ def validate(self, attrs): class InstanceLinkSerializer(BaseSerializer): class Meta: model = InstanceLink - fields = ('id', 'url', 'related', 'source', 'target', 'link_state') + fields = ('id', 'related', 'source', 'target', 'target_full_address', 'link_state') source = serializers.SlugRelatedField(slug_field="hostname", queryset=Instance.objects.all()) - target = serializers.SlugRelatedField(slug_field="hostname", queryset=Instance.objects.all()) + + target = serializers.SerializerMethodField() + target_full_address = serializers.SerializerMethodField() def get_related(self, obj): res = super(InstanceLinkSerializer, self).get_related(obj) res['source_instance'] = self.reverse('api:instance_detail', kwargs={'pk': obj.source.id}) - res['target_instance'] = self.reverse('api:instance_detail', kwargs={'pk': obj.target.id}) + res['target_address'] = self.reverse('api:receptor_address_detail', kwargs={'pk': obj.target.id}) return res + def get_target(self, obj): + return obj.target.instance.hostname + + def get_target_full_address(self, obj): + return obj.target.get_full_address() + class InstanceNodeSerializer(BaseSerializer): class Meta: @@ -5476,6 +5485,29 @@ class Meta: fields = ('id', 'hostname', 'node_type', 'node_state', 'enabled') +class ReceptorAddressSerializer(BaseSerializer): + full_address = serializers.SerializerMethodField() + + class Meta: + model = ReceptorAddress + fields = ( + 'id', + 'url', + 'address', + 'port', + 'protocol', + 'websocket_path', + 'is_internal', + 'canonical', + 'instance', + 'peers_from_control_nodes', + 'full_address', + ) + + def get_full_address(self, obj): + return obj.get_full_address() + + class InstanceSerializer(BaseSerializer): show_capabilities = ['edit'] @@ -5484,11 +5516,17 @@ class InstanceSerializer(BaseSerializer): jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that are targeted for this instance'), read_only=True) jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True) health_check_pending = serializers.SerializerMethodField() - peers = serializers.SlugRelatedField(many=True, required=False, slug_field="hostname", queryset=Instance.objects.all()) + peers = serializers.PrimaryKeyRelatedField( + help_text=_('Primary keys of receptor addresses to peer to.'), many=True, required=False, queryset=ReceptorAddress.objects.all() + ) + reverse_peers = serializers.SerializerMethodField() + listener_port = serializers.IntegerField(source='canonical_address_port', required=False, allow_null=True) + peers_from_control_nodes = serializers.BooleanField(source='canonical_address_peers_from_control_nodes', required=False) + protocol = serializers.SerializerMethodField() class Meta: model = Instance - read_only_fields = ('ip_address', 'uuid', 'version') + read_only_fields = ('ip_address', 'uuid', 'version', 'managed', 'reverse_peers') fields = ( 'id', 'hostname', @@ -5519,10 +5557,13 @@ class Meta: 'managed_by_policy', 'node_type', 'node_state', + 'managed', 'ip_address', - 'listener_port', 'peers', + 'reverse_peers', + 'listener_port', 'peers_from_control_nodes', + 'protocol', ) extra_kwargs = { 'node_type': {'initial': Instance.Types.EXECUTION, 'default': Instance.Types.EXECUTION}, @@ -5544,16 +5585,54 @@ class Meta: def get_related(self, obj): res = super(InstanceSerializer, self).get_related(obj) + res['receptor_addresses'] = self.reverse('api:instance_receptor_addresses_list', kwargs={'pk': obj.pk}) res['jobs'] = self.reverse('api:instance_unified_jobs_list', kwargs={'pk': obj.pk}) + res['peers'] = self.reverse('api:instance_peers_list', kwargs={"pk": obj.pk}) res['instance_groups'] = self.reverse('api:instance_instance_groups_list', kwargs={'pk': obj.pk}) if obj.node_type in [Instance.Types.EXECUTION, Instance.Types.HOP]: res['install_bundle'] = self.reverse('api:instance_install_bundle', kwargs={'pk': obj.pk}) - res['peers'] = self.reverse('api:instance_peers_list', kwargs={"pk": obj.pk}) if self.context['request'].user.is_superuser or self.context['request'].user.is_system_auditor: if obj.node_type == 'execution': res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk}) return res + def create_or_update(self, validated_data, obj=None, create=True): + # create a managed receptor address if listener port is defined + port = validated_data.pop('listener_port', -1) + peers_from_control_nodes = validated_data.pop('peers_from_control_nodes', -1) + + # delete the receptor address if the port is explicitly set to None + if obj and port == None: + obj.receptor_addresses.filter(address=obj.hostname).delete() + + if create: + instance = super(InstanceSerializer, self).create(validated_data) + else: + instance = super(InstanceSerializer, self).update(obj, validated_data) + instance.refresh_from_db() # instance canonical address lookup is deferred, so needs to be reloaded + + # only create or update if port is defined in validated_data or already exists in the + # canonical address + # this prevents creating a receptor address if peers_from_control_nodes is in + # validated_data but a port is not set + if (port != None and port != -1) or instance.canonical_address_port: + kwargs = {} + if port != -1: + kwargs['port'] = port + if peers_from_control_nodes != -1: + kwargs['peers_from_control_nodes'] = peers_from_control_nodes + if kwargs: + kwargs['canonical'] = True + instance.receptor_addresses.update_or_create(address=instance.hostname, defaults=kwargs) + + return instance + + def create(self, validated_data): + return self.create_or_update(validated_data, create=True) + + def update(self, obj, validated_data): + return self.create_or_update(validated_data, obj, create=False) + def get_summary_fields(self, obj): summary = super().get_summary_fields(obj) @@ -5563,6 +5642,16 @@ def get_summary_fields(self, obj): return summary + def get_reverse_peers(self, obj): + return Instance.objects.prefetch_related('peers').filter(peers__in=obj.receptor_addresses.all()).values_list('id', flat=True) + + def get_protocol(self, obj): + # note: don't create a different query for receptor addresses, as this is prefetched on the View for optimization + for addr in obj.receptor_addresses.all(): + if addr.canonical: + return addr.protocol + return "" + def get_consumed_capacity(self, obj): return obj.consumed_capacity @@ -5576,47 +5665,20 @@ def get_health_check_pending(self, obj): return obj.health_check_pending def validate(self, attrs): - def get_field_from_model_or_attrs(fd): - return attrs.get(fd, self.instance and getattr(self.instance, fd) or None) - - def check_peers_changed(): - ''' - return True if - - 'peers' in attrs - - instance peers matches peers in attrs - ''' - return self.instance and 'peers' in attrs and set(self.instance.peers.all()) != set(attrs['peers']) + # Oddly, using 'source' on a DRF field populates attrs with the source name, so we should rename it back + if 'canonical_address_port' in attrs: + attrs['listener_port'] = attrs.pop('canonical_address_port') + if 'canonical_address_peers_from_control_nodes' in attrs: + attrs['peers_from_control_nodes'] = attrs.pop('canonical_address_peers_from_control_nodes') if not self.instance and not settings.IS_K8S: raise serializers.ValidationError(_("Can only create instances on Kubernetes or OpenShift.")) - node_type = get_field_from_model_or_attrs("node_type") - peers_from_control_nodes = get_field_from_model_or_attrs("peers_from_control_nodes") - listener_port = get_field_from_model_or_attrs("listener_port") - peers = attrs.get('peers', []) - - if peers_from_control_nodes and node_type not in (Instance.Types.EXECUTION, Instance.Types.HOP): - raise serializers.ValidationError(_("peers_from_control_nodes can only be enabled for execution or hop nodes.")) - - if node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]: - if check_peers_changed(): - raise serializers.ValidationError( - _("Setting peers manually for control nodes is not allowed. Enable peers_from_control_nodes on the hop and execution nodes instead.") - ) - - if not listener_port and peers_from_control_nodes: - raise serializers.ValidationError(_("Field listener_port must be a valid integer when peers_from_control_nodes is enabled.")) - - if not listener_port and self.instance and self.instance.peers_from.exists(): - raise serializers.ValidationError(_("Field listener_port must be a valid integer when other nodes peer to it.")) - - for peer in peers: - if peer.listener_port is None: - raise serializers.ValidationError(_("Field listener_port must be set on peer ") + peer.hostname + ".") - - if not settings.IS_K8S: - if check_peers_changed(): - raise serializers.ValidationError(_("Cannot change peers.")) + # cannot enable peers_from_control_nodes if listener_port is not set + if attrs.get('peers_from_control_nodes'): + port = attrs.get('listener_port', -1) # -1 denotes missing, None denotes explicit null + if (port is None) or (port == -1 and self.instance and self.instance.canonical_address is None): + raise serializers.ValidationError(_("Cannot enable peers_from_control_nodes if listener_port is not set.")) return super().validate(attrs) @@ -5636,8 +5698,8 @@ def validate_node_state(self, value): raise serializers.ValidationError(_("Can only change the state on Kubernetes or OpenShift.")) if value != Instance.States.DEPROVISIONING: raise serializers.ValidationError(_("Can only change instances to the 'deprovisioning' state.")) - if self.instance.node_type not in (Instance.Types.EXECUTION, Instance.Types.HOP): - raise serializers.ValidationError(_("Can only deprovision execution or hop nodes.")) + if self.instance.managed: + raise serializers.ValidationError(_("Cannot deprovision managed nodes.")) else: if value and value != Instance.States.INSTALLED: raise serializers.ValidationError(_("Can only create instances in the 'installed' state.")) @@ -5656,18 +5718,48 @@ def validate_hostname(self, value): def validate_listener_port(self, value): """ Cannot change listener port, unless going from none to integer, and vice versa + If instance is managed, cannot change listener port at all """ - if value and self.instance and self.instance.listener_port and self.instance.listener_port != value: - raise serializers.ValidationError(_("Cannot change listener port.")) + if self.instance: + canonical_address_port = self.instance.canonical_address_port + if value and canonical_address_port and canonical_address_port != value: + raise serializers.ValidationError(_("Cannot change listener port.")) + if self.instance.managed and value != canonical_address_port: + raise serializers.ValidationError(_("Cannot change listener port for managed nodes.")) + return value + + def validate_peers(self, value): + # cannot peer to an instance more than once + peers_instances = Counter(p.instance_id for p in value) + if any(count > 1 for count in peers_instances.values()): + raise serializers.ValidationError(_("Cannot peer to the same instance more than once.")) + + if self.instance: + instance_addresses = set(self.instance.receptor_addresses.all()) + setting_peers = set(value) + peers_changed = set(self.instance.peers.all()) != setting_peers + + if not settings.IS_K8S and peers_changed: + raise serializers.ValidationError(_("Cannot change peers.")) + + if self.instance.managed and peers_changed: + raise serializers.ValidationError(_("Setting peers manually for managed nodes is not allowed.")) + + # cannot peer to self + if instance_addresses & setting_peers: + raise serializers.ValidationError(_("Instance cannot peer to its own address.")) + + # cannot peer to an instance that is already peered to this instance + if instance_addresses: + for p in setting_peers: + if set(p.instance.peers.all()) & instance_addresses: + raise serializers.ValidationError(_(f"Instance {p.instance.hostname} is already peered to this instance.")) return value def validate_peers_from_control_nodes(self, value): - """ - Can only enable for K8S based deployments - """ - if value and not settings.IS_K8S: - raise serializers.ValidationError(_("Can only be enabled on Kubernetes or Openshift.")) + if self.instance and self.instance.managed and self.instance.canonical_address_peers_from_control_nodes != value: + raise serializers.ValidationError(_("Cannot change peers_from_control_nodes for managed nodes.")) return value diff --git a/awx/api/templates/instance_install_bundle/group_vars/all.yml b/awx/api/templates/instance_install_bundle/group_vars/all.yml index 861572748c08..7c7c815d67aa 100644 --- a/awx/api/templates/instance_install_bundle/group_vars/all.yml +++ b/awx/api/templates/instance_install_bundle/group_vars/all.yml @@ -17,19 +17,18 @@ custom_worksign_public_keyfile: receptor/work_public_key.pem custom_tls_certfile: receptor/tls/receptor.crt custom_tls_keyfile: receptor/tls/receptor.key custom_ca_certfile: receptor/tls/ca/mesh-CA.crt -receptor_protocol: 'tcp' -{% if instance.listener_port %} +{% if listener_port %} +receptor_protocol: {{ listener_protocol }} receptor_listener: true -receptor_port: {{ instance.listener_port }} +receptor_port: {{ listener_port }} {% else %} receptor_listener: false {% endif %} {% if peers %} receptor_peers: {% for peer in peers %} - - host: {{ peer.host }} - port: {{ peer.port }} - protocol: tcp + - address: {{ peer.address }} + protocol: {{ peer.protocol }} {% endfor %} {% endif %} {% verbatim %} diff --git a/awx/api/templates/instance_install_bundle/requirements.yml b/awx/api/templates/instance_install_bundle/requirements.yml index 69dbf5dcb70d..65df80b51d17 100644 --- a/awx/api/templates/instance_install_bundle/requirements.yml +++ b/awx/api/templates/instance_install_bundle/requirements.yml @@ -1,4 +1,4 @@ --- collections: - name: ansible.receptor - version: 2.0.2 + version: 2.0.3 diff --git a/awx/api/urls/instance.py b/awx/api/urls/instance.py index 0d4df1df4523..84a3904657f5 100644 --- a/awx/api/urls/instance.py +++ b/awx/api/urls/instance.py @@ -10,6 +10,7 @@ InstanceInstanceGroupsList, InstanceHealthCheck, InstancePeersList, + InstanceReceptorAddressesList, ) from awx.api.views.instance_install_bundle import InstanceInstallBundle @@ -21,6 +22,7 @@ re_path(r'^(?P[0-9]+)/instance_groups/$', InstanceInstanceGroupsList.as_view(), name='instance_instance_groups_list'), re_path(r'^(?P[0-9]+)/health_check/$', InstanceHealthCheck.as_view(), name='instance_health_check'), re_path(r'^(?P[0-9]+)/peers/$', InstancePeersList.as_view(), name='instance_peers_list'), + re_path(r'^(?P[0-9]+)/receptor_addresses/$', InstanceReceptorAddressesList.as_view(), name='instance_receptor_addresses_list'), re_path(r'^(?P[0-9]+)/install_bundle/$', InstanceInstallBundle.as_view(), name='instance_install_bundle'), ] diff --git a/awx/api/urls/receptor_address.py b/awx/api/urls/receptor_address.py new file mode 100644 index 000000000000..fe630f3da4c1 --- /dev/null +++ b/awx/api/urls/receptor_address.py @@ -0,0 +1,17 @@ +# Copyright (c) 2017 Ansible, Inc. +# All Rights Reserved. + +from django.urls import re_path + +from awx.api.views import ( + ReceptorAddressesList, + ReceptorAddressDetail, +) + + +urls = [ + re_path(r'^$', ReceptorAddressesList.as_view(), name='receptor_addresses_list'), + re_path(r'^(?P[0-9]+)/$', ReceptorAddressDetail.as_view(), name='receptor_address_detail'), +] + +__all__ = ['urls'] diff --git a/awx/api/urls/urls.py b/awx/api/urls/urls.py index 030ba25edef5..c2218e5ed865 100644 --- a/awx/api/urls/urls.py +++ b/awx/api/urls/urls.py @@ -85,6 +85,7 @@ from .workflow_approval_template import urls as workflow_approval_template_urls from .workflow_approval import urls as workflow_approval_urls from .analytics import urls as analytics_urls +from .receptor_address import urls as receptor_address_urls v2_urls = [ re_path(r'^$', ApiV2RootView.as_view(), name='api_v2_root_view'), @@ -155,6 +156,7 @@ re_path(r'^bulk/host_create/$', BulkHostCreateView.as_view(), name='bulk_host_create'), re_path(r'^bulk/host_delete/$', BulkHostDeleteView.as_view(), name='bulk_host_delete'), re_path(r'^bulk/job_launch/$', BulkJobLaunchView.as_view(), name='bulk_job_launch'), + re_path(r'^receptor_addresses/', include(receptor_address_urls)), ] diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 80fc152bf404..7437c85c664f 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -337,12 +337,20 @@ class InstanceList(ListCreateAPIView): search_fields = ('hostname',) ordering = ('id',) + def get_queryset(self): + qs = super().get_queryset().prefetch_related('receptor_addresses') + return qs + class InstanceDetail(RetrieveUpdateAPIView): name = _("Instance Detail") model = models.Instance serializer_class = serializers.InstanceSerializer + def get_queryset(self): + qs = super().get_queryset().prefetch_related('receptor_addresses') + return qs + def update_raw_data(self, data): # these fields are only valid on creation of an instance, so they unwanted on detail view data.pop('node_type', None) @@ -375,13 +383,37 @@ def get_queryset(self): class InstancePeersList(SubListAPIView): - name = _("Instance Peers") + name = _("Peers") + model = models.ReceptorAddress + serializer_class = serializers.ReceptorAddressSerializer parent_model = models.Instance - model = models.Instance - serializer_class = serializers.InstanceSerializer parent_access = 'read' - search_fields = {'hostname'} relationship = 'peers' + search_fields = ('address',) + + +class InstanceReceptorAddressesList(SubListAPIView): + name = _("Receptor Addresses") + model = models.ReceptorAddress + parent_key = 'instance' + parent_model = models.Instance + serializer_class = serializers.ReceptorAddressSerializer + search_fields = ('address',) + + +class ReceptorAddressesList(ListAPIView): + name = _("Receptor Addresses") + model = models.ReceptorAddress + serializer_class = serializers.ReceptorAddressSerializer + search_fields = ('address',) + + +class ReceptorAddressDetail(RetrieveAPIView): + name = _("Receptor Address Detail") + model = models.ReceptorAddress + serializer_class = serializers.ReceptorAddressSerializer + parent_model = models.Instance + relationship = 'receptor_addresses' class InstanceInstanceGroupsList(InstanceGroupMembershipMixin, SubListCreateAttachDetachAPIView): diff --git a/awx/api/views/instance_install_bundle.py b/awx/api/views/instance_install_bundle.py index 9ae7f7c4606b..6e4d802ed01f 100644 --- a/awx/api/views/instance_install_bundle.py +++ b/awx/api/views/instance_install_bundle.py @@ -124,10 +124,19 @@ def generate_inventory_yml(instance_obj): def generate_group_vars_all_yml(instance_obj): + # get peers peers = [] - for instance in instance_obj.peers.all(): - peers.append(dict(host=instance.hostname, port=instance.listener_port)) - all_yaml = render_to_string("instance_install_bundle/group_vars/all.yml", context=dict(instance=instance_obj, peers=peers)) + for addr in instance_obj.peers.select_related('instance'): + peers.append(dict(address=addr.get_full_address(), protocol=addr.protocol)) + context = dict(instance=instance_obj, peers=peers) + + canonical_addr = instance_obj.canonical_address + if canonical_addr: + context['listener_port'] = canonical_addr.port + protocol = canonical_addr.protocol if canonical_addr.protocol != 'wss' else 'ws' + context['listener_protocol'] = protocol + + all_yaml = render_to_string("instance_install_bundle/group_vars/all.yml", context=context) # convert consecutive newlines with a single newline return re.sub(r'\n+', '\n', all_yaml) diff --git a/awx/api/views/mesh_visualizer.py b/awx/api/views/mesh_visualizer.py index d09dab07327f..e76898972974 100644 --- a/awx/api/views/mesh_visualizer.py +++ b/awx/api/views/mesh_visualizer.py @@ -17,7 +17,7 @@ class MeshVisualizer(APIView): def get(self, request, format=None): data = { 'nodes': InstanceNodeSerializer(Instance.objects.all(), many=True).data, - 'links': InstanceLinkSerializer(InstanceLink.objects.select_related('target', 'source'), many=True).data, + 'links': InstanceLinkSerializer(InstanceLink.objects.select_related('target__instance', 'source'), many=True).data, } return Response(data) diff --git a/awx/api/views/root.py b/awx/api/views/root.py index 3a9a910e1c2d..65f3e0e1a62a 100644 --- a/awx/api/views/root.py +++ b/awx/api/views/root.py @@ -84,6 +84,7 @@ def get(self, request, format=None): data['ping'] = reverse('api:api_v2_ping_view', request=request) data['instances'] = reverse('api:instance_list', request=request) data['instance_groups'] = reverse('api:instance_group_list', request=request) + data['receptor_addresses'] = reverse('api:receptor_addresses_list', request=request) data['config'] = reverse('api:api_v2_config_view', request=request) data['settings'] = reverse('api:setting_category_list', request=request) data['me'] = reverse('api:user_me_list', request=request) diff --git a/awx/main/access.py b/awx/main/access.py index 5b297353568f..c768e74c4cb5 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -57,6 +57,7 @@ Project, ProjectUpdate, ProjectUpdateEvent, + ReceptorAddress, Role, Schedule, SystemJob, @@ -2430,6 +2431,29 @@ def can_delete(self, obj): return False +class ReceptorAddressAccess(BaseAccess): + """ + I can see receptor address records whenever I can access the instance + """ + + model = ReceptorAddress + + def filtered_queryset(self): + return self.model.objects.filter(Q(instance__in=Instance.accessible_pk_qs(self.user, 'read_role'))) + + @check_superuser + def can_add(self, data): + return False + + @check_superuser + def can_change(self, obj, data): + return False + + @check_superuser + def can_delete(self, obj): + return False + + class SystemJobEventAccess(BaseAccess): """ I can only see manage System Jobs events if I'm a super user diff --git a/awx/main/management/commands/add_receptor_address.py b/awx/main/management/commands/add_receptor_address.py new file mode 100644 index 000000000000..7ac7ac8be70b --- /dev/null +++ b/awx/main/management/commands/add_receptor_address.py @@ -0,0 +1,53 @@ +# Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved + +from django.core.management.base import BaseCommand + +from awx.main.models import Instance, ReceptorAddress + + +def add_address(**kwargs): + try: + instance = Instance.objects.get(hostname=kwargs.pop('instance')) + kwargs['instance'] = instance + + if kwargs.get('canonical') and instance.receptor_addresses.filter(canonical=True).exclude(address=kwargs['address']).exists(): + print(f"Instance {instance.hostname} already has a canonical address, skipping") + return False + # if ReceptorAddress already exists with address, just update + # otherwise, create new ReceptorAddress + addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), defaults=kwargs) + print(f"Successfully added receptor address {addr.get_full_address()}") + return True + except Exception as e: + print(f"Error adding receptor address: {e}") + return False + + +class Command(BaseCommand): + """ + Internal controller command. + Register receptor address to an already-registered instance. + """ + + help = "Add receptor address to an instance." + + def add_arguments(self, parser): + parser.add_argument('--instance', dest='instance', required=True, type=str, help="Instance hostname this address is added to") + parser.add_argument('--address', dest='address', required=True, type=str, help="Receptor address") + parser.add_argument('--port', dest='port', type=int, help="Receptor listener port") + parser.add_argument('--websocket_path', dest='websocket_path', type=str, default="", help="Path for websockets") + parser.add_argument('--is_internal', action='store_true', help="If true, address only resolvable within the Kubernetes cluster") + parser.add_argument('--protocol', type=str, default='tcp', choices=['tcp', 'ws', 'wss'], help="Protocol to use for the Receptor listener") + parser.add_argument('--canonical', action='store_true', help="If true, address is the canonical address for the instance") + parser.add_argument('--peers_from_control_nodes', action='store_true', help="If true, control nodes will peer to this address") + + def handle(self, **options): + address_options = { + k: options[k] + for k in ('instance', 'address', 'port', 'websocket_path', 'is_internal', 'protocol', 'peers_from_control_nodes', 'canonical') + if options[k] + } + changed = add_address(**address_options) + if changed: + print("(changed: True)") diff --git a/awx/main/management/commands/list_instances.py b/awx/main/management/commands/list_instances.py index a434e2299e9b..b2bbcfea2910 100644 --- a/awx/main/management/commands/list_instances.py +++ b/awx/main/management/commands/list_instances.py @@ -55,7 +55,7 @@ def handle(self, *args, **options): capacity = f' capacity={x.capacity}' if x.node_type != 'hop' else '' version = f" version={x.version or '?'}" if x.node_type != 'hop' else '' - heartbeat = f' heartbeat="{x.last_seen:%Y-%m-%d %H:%M:%S}"' if x.capacity or x.node_type == 'hop' else '' + heartbeat = f' heartbeat="{x.last_seen:%Y-%m-%d %H:%M:%S}"' if x.last_seen else '' print(f'\t{color}{x.hostname}{capacity} node_type={x.node_type}{version}{heartbeat}{end_color}') print() diff --git a/awx/main/management/commands/provision_instance.py b/awx/main/management/commands/provision_instance.py index e7f8063d61bd..5a60328d960d 100644 --- a/awx/main/management/commands/provision_instance.py +++ b/awx/main/management/commands/provision_instance.py @@ -25,20 +25,17 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument('--hostname', dest='hostname', type=str, help="Hostname used during provisioning") - parser.add_argument('--listener_port', dest='listener_port', type=int, help="Receptor listener port") parser.add_argument('--node_type', type=str, default='hybrid', choices=['control', 'execution', 'hop', 'hybrid'], help="Instance Node type") parser.add_argument('--uuid', type=str, help="Instance UUID") - def _register_hostname(self, hostname, node_type, uuid, listener_port): + def _register_hostname(self, hostname, node_type, uuid): if not hostname: if not settings.AWX_AUTO_DEPROVISION_INSTANCES: raise CommandError('Registering with values from settings only intended for use in K8s installs') from awx.main.management.commands.register_queue import RegisterQueue - (changed, instance) = Instance.objects.register( - ip_address=os.environ.get('MY_POD_IP'), listener_port=listener_port, node_type='control', node_uuid=settings.SYSTEM_UUID - ) + (changed, instance) = Instance.objects.register(ip_address=os.environ.get('MY_POD_IP'), node_type='control', node_uuid=settings.SYSTEM_UUID) RegisterQueue(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, 100, 0, [], is_container_group=False).register() RegisterQueue( settings.DEFAULT_EXECUTION_QUEUE_NAME, @@ -51,16 +48,17 @@ def _register_hostname(self, hostname, node_type, uuid, listener_port): max_concurrent_jobs=settings.DEFAULT_EXECUTION_QUEUE_MAX_CONCURRENT_JOBS, ).register() else: - (changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, node_uuid=uuid, listener_port=listener_port) + (changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, node_uuid=uuid) if changed: print("Successfully registered instance {}".format(hostname)) else: print("Instance already registered {}".format(instance.hostname)) + self.changed = changed @transaction.atomic def handle(self, **options): self.changed = False - self._register_hostname(options.get('hostname'), options.get('node_type'), options.get('uuid'), options.get('listener_port')) + self._register_hostname(options.get('hostname'), options.get('node_type'), options.get('uuid')) if self.changed: print("(changed: True)") diff --git a/awx/main/management/commands/register_peers.py b/awx/main/management/commands/register_peers.py index 078edb08c78f..7142f4b663fd 100644 --- a/awx/main/management/commands/register_peers.py +++ b/awx/main/management/commands/register_peers.py @@ -1,9 +1,7 @@ -import warnings - from django.core.management.base import BaseCommand, CommandError from django.db import transaction -from awx.main.models import Instance, InstanceLink +from awx.main.models import Instance, InstanceLink, ReceptorAddress class Command(BaseCommand): @@ -28,7 +26,9 @@ def add_arguments(self, parser): def handle(self, **options): # provides a mapping of hostname to Instance objects - nodes = Instance.objects.in_bulk(field_name='hostname') + nodes = Instance.objects.all().in_bulk(field_name='hostname') + # provides a mapping of address to ReceptorAddress objects + addresses = ReceptorAddress.objects.all().in_bulk(field_name='address') if options['source'] not in nodes: raise CommandError(f"Host {options['source']} is not a registered instance.") @@ -39,6 +39,14 @@ def handle(self, **options): if options['exact'] is not None and options['disconnect']: raise CommandError("The option --disconnect may not be used with --exact.") + # make sure each target has a receptor address + peers = options['peers'] or [] + disconnect = options['disconnect'] or [] + exact = options['exact'] or [] + for peer in peers + disconnect + exact: + if peer not in addresses: + raise CommandError(f"Peer {peer} does not have a receptor address.") + # No 1-cycles for collection in ('peers', 'disconnect', 'exact'): if options[collection] is not None and options['source'] in options[collection]: @@ -47,9 +55,12 @@ def handle(self, **options): # No 2-cycles if options['peers'] or options['exact'] is not None: peers = set(options['peers'] or options['exact']) - incoming = set(InstanceLink.objects.filter(target=nodes[options['source']]).values_list('source__hostname', flat=True)) + if options['source'] in addresses: + incoming = set(InstanceLink.objects.filter(target=addresses[options['source']]).values_list('source__hostname', flat=True)) + else: + incoming = set() if peers & incoming: - warnings.warn(f"Source node {options['source']} should not link to nodes already peering to it: {peers & incoming}.") + raise CommandError(f"Source node {options['source']} should not link to nodes already peering to it: {peers & incoming}.") if options['peers']: missing_peers = set(options['peers']) - set(nodes) @@ -60,7 +71,7 @@ def handle(self, **options): results = 0 for target in options['peers']: _, created = InstanceLink.objects.update_or_create( - source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED} + source=nodes[options['source']], target=addresses[target], defaults={'link_state': InstanceLink.States.ESTABLISHED} ) if created: results += 1 @@ -70,9 +81,9 @@ def handle(self, **options): if options['disconnect']: results = 0 for target in options['disconnect']: - if target not in nodes: # Be permissive, the node might have already been de-registered. + if target not in addresses: # Be permissive, the node might have already been de-registered. continue - n, _ = InstanceLink.objects.filter(source=nodes[options['source']], target=nodes[target]).delete() + n, _ = InstanceLink.objects.filter(source=nodes[options['source']], target=addresses[target]).delete() results += n print(f"{results} peer links removed from the database.") @@ -81,11 +92,11 @@ def handle(self, **options): additions = 0 with transaction.atomic(): peers = set(options['exact']) - links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__hostname', flat=True)) - removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__hostname__in=links - peers).delete() + links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__address', flat=True)) + removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__instance__hostname__in=links - peers).delete() for target in peers - links: _, created = InstanceLink.objects.update_or_create( - source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED} + source=nodes[options['source']], target=addresses[target], defaults={'link_state': InstanceLink.States.ESTABLISHED} ) if created: additions += 1 diff --git a/awx/main/management/commands/remove_receptor_address.py b/awx/main/management/commands/remove_receptor_address.py new file mode 100644 index 000000000000..de7426a53ff3 --- /dev/null +++ b/awx/main/management/commands/remove_receptor_address.py @@ -0,0 +1,26 @@ +# Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved + +from django.core.management.base import BaseCommand + +from awx.main.models import ReceptorAddress + + +class Command(BaseCommand): + """ + Internal controller command. + Delete a receptor address. + """ + + help = "Add receptor address to an instance." + + def add_arguments(self, parser): + parser.add_argument('--address', dest='address', type=str, help="Receptor address to remove") + + def handle(self, **options): + deleted = ReceptorAddress.objects.filter(address=options['address']).delete() + if deleted[0]: + print(f"Successfully removed {options['address']}") + print("(changed: True)") + else: + print(f"Did not remove {options['address']}, not found") diff --git a/awx/main/managers.py b/awx/main/managers.py index 747f9d4467d1..c501d7b0d397 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -115,7 +115,14 @@ def me(self): return node[0] raise RuntimeError("No instance found with the current cluster host id") - def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=None, node_type='hybrid', defaults=None): + def register( + self, + node_uuid=None, + hostname=None, + ip_address="", + node_type='hybrid', + defaults=None, + ): if not hostname: hostname = settings.CLUSTER_HOST_ID @@ -161,9 +168,6 @@ def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=N if instance.node_type != node_type: instance.node_type = node_type update_fields.append('node_type') - if instance.listener_port != listener_port: - instance.listener_port = listener_port - update_fields.append('listener_port') if update_fields: instance.save(update_fields=update_fields) return (True, instance) @@ -174,11 +178,13 @@ def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=N create_defaults = { 'node_state': Instance.States.INSTALLED, 'capacity': 0, + 'managed': True, } if defaults is not None: create_defaults.update(defaults) uuid_option = {'uuid': node_uuid if node_uuid is not None else uuid.uuid4()} if node_type == 'execution' and 'version' not in create_defaults: create_defaults['version'] = RECEPTOR_PENDING - instance = self.create(hostname=hostname, ip_address=ip_address, listener_port=listener_port, node_type=node_type, **create_defaults, **uuid_option) + instance = self.create(hostname=hostname, ip_address=ip_address, node_type=node_type, **create_defaults, **uuid_option) + return (True, instance) diff --git a/awx/main/migrations/0189_inbound_hop_nodes.py b/awx/main/migrations/0189_inbound_hop_nodes.py new file mode 100644 index 000000000000..aaaaff9aecda --- /dev/null +++ b/awx/main/migrations/0189_inbound_hop_nodes.py @@ -0,0 +1,150 @@ +# Generated by Django 4.2.6 on 2024-01-19 19:24 + +import django.core.validators +from django.db import migrations, models +import django.db.models.deletion + + +def create_receptor_addresses(apps, schema_editor): + """ + If listener_port was defined on an instance, create a receptor address for it + """ + Instance = apps.get_model('main', 'Instance') + ReceptorAddress = apps.get_model('main', 'ReceptorAddress') + for instance in Instance.objects.exclude(listener_port=None): + ReceptorAddress.objects.create( + instance=instance, + address=instance.hostname, + port=instance.listener_port, + peers_from_control_nodes=instance.peers_from_control_nodes, + protocol='tcp', + is_internal=False, + canonical=True, + ) + + +def link_to_receptor_addresses(apps, schema_editor): + """ + Modify each InstanceLink to point to the newly created + ReceptorAddresses, using the new target field + """ + InstanceLink = apps.get_model('main', 'InstanceLink') + for link in InstanceLink.objects.all(): + link.target = link.target_old.receptor_addresses.get() + link.save() + + +class Migration(migrations.Migration): + dependencies = [ + ('main', '0188_add_bitbucket_dc_webhook'), + ] + + operations = [ + migrations.CreateModel( + name='ReceptorAddress', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('address', models.CharField(help_text='Routable address for this instance.', max_length=255)), + ( + 'port', + models.IntegerField( + default=27199, + help_text='Port for the address.', + validators=[django.core.validators.MinValueValidator(0), django.core.validators.MaxValueValidator(65535)], + ), + ), + ('websocket_path', models.CharField(blank=True, default='', help_text='Websocket path.', max_length=255)), + ( + 'protocol', + models.CharField( + choices=[('tcp', 'TCP'), ('ws', 'WS'), ('wss', 'WSS')], + default='tcp', + help_text="Protocol to use for the Receptor listener, 'tcp', 'wss', or 'ws'.", + max_length=10, + ), + ), + ('is_internal', models.BooleanField(default=False, help_text='If True, only routable within the Kubernetes cluster.')), + ('canonical', models.BooleanField(default=False, help_text='If True, this address is the canonical address for the instance.')), + ( + 'peers_from_control_nodes', + models.BooleanField(default=False, help_text='If True, control plane cluster nodes should automatically peer to it.'), + ), + ], + ), + migrations.RemoveConstraint( + model_name='instancelink', + name='source_and_target_can_not_be_equal', + ), + migrations.RenameField( + model_name='instancelink', + old_name='target', + new_name='target_old', + ), + migrations.AlterUniqueTogether( + name='instancelink', + unique_together=set(), + ), + migrations.AddField( + model_name='instance', + name='managed', + field=models.BooleanField(default=False, editable=False, help_text='If True, this instance is managed by the control plane.'), + ), + migrations.AlterField( + model_name='instancelink', + name='source', + field=models.ForeignKey(help_text='The source instance of this peer link.', on_delete=django.db.models.deletion.CASCADE, to='main.instance'), + ), + migrations.AddField( + model_name='receptoraddress', + name='instance', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='receptor_addresses', to='main.instance'), + ), + migrations.AddField( + model_name='activitystream', + name='receptor_address', + field=models.ManyToManyField(blank=True, to='main.receptoraddress'), + ), + migrations.AddConstraint( + model_name='receptoraddress', + constraint=models.UniqueConstraint(fields=('address',), name='unique_receptor_address', violation_error_message='Receptor address must be unique.'), + ), + migrations.AddField( + model_name='instancelink', + name='target', + field=models.ForeignKey( + help_text='The target receptor address of this peer link.', null=True, on_delete=django.db.models.deletion.CASCADE, to='main.receptoraddress' + ), + ), + migrations.RunPython(create_receptor_addresses), + migrations.RunPython(link_to_receptor_addresses), + migrations.RemoveField( + model_name='instance', + name='peers_from_control_nodes', + ), + migrations.RemoveField( + model_name='instance', + name='listener_port', + ), + migrations.RemoveField( + model_name='instancelink', + name='target_old', + ), + migrations.AlterField( + model_name='instance', + name='peers', + field=models.ManyToManyField(related_name='peers_from', through='main.InstanceLink', to='main.receptoraddress'), + ), + migrations.AlterField( + model_name='instancelink', + name='target', + field=models.ForeignKey( + help_text='The target receptor address of this peer link.', on_delete=django.db.models.deletion.CASCADE, to='main.receptoraddress' + ), + ), + migrations.AddConstraint( + model_name='instancelink', + constraint=models.UniqueConstraint( + fields=('source', 'target'), name='unique_source_target', violation_error_message='Field source and target must be unique together.' + ), + ), + ] diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index 2363543a9c61..3148f44fb51e 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -14,6 +14,7 @@ from awx.main.models.organization import Organization, Profile, Team, UserSessionMembership # noqa from awx.main.models.credential import Credential, CredentialType, CredentialInputSource, ManagedCredentialType, build_safe_env # noqa from awx.main.models.projects import Project, ProjectUpdate # noqa +from awx.main.models.receptor_address import ReceptorAddress # noqa from awx.main.models.inventory import ( # noqa CustomInventoryScript, Group, diff --git a/awx/main/models/activity_stream.py b/awx/main/models/activity_stream.py index 7bce0c4fe0d8..2dccf3158f44 100644 --- a/awx/main/models/activity_stream.py +++ b/awx/main/models/activity_stream.py @@ -77,6 +77,7 @@ class Meta: notification_template = models.ManyToManyField("NotificationTemplate", blank=True) notification = models.ManyToManyField("Notification", blank=True) label = models.ManyToManyField("Label", blank=True) + receptor_address = models.ManyToManyField("ReceptorAddress", blank=True) role = models.ManyToManyField("Role", blank=True) instance = models.ManyToManyField("Instance", blank=True) instance_group = models.ManyToManyField("InstanceGroup", blank=True) diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 4792d4e4d4bb..5c1f5df81078 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -5,7 +5,7 @@ import logging import os -from django.core.validators import MinValueValidator, MaxValueValidator +from django.core.validators import MinValueValidator from django.db import models, connection from django.db.models.signals import post_save, post_delete from django.dispatch import receiver @@ -34,6 +34,7 @@ from awx.main.models.unified_jobs import UnifiedJob from awx.main.utils.common import get_corrected_cpu, get_cpu_effective_capacity, get_corrected_memory, get_mem_effective_capacity from awx.main.models.mixins import RelatedJobsMixin, ResourceMixin +from awx.main.models.receptor_address import ReceptorAddress # ansible-runner from ansible_runner.utils.capacity import get_cpu_count, get_mem_in_bytes @@ -64,8 +65,19 @@ def has_policy_changes(self): class InstanceLink(BaseModel): - source = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='+') - target = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='reverse_peers') + class Meta: + ordering = ("id",) + # add constraint for source and target to be unique together + constraints = [ + models.UniqueConstraint( + fields=["source", "target"], + name="unique_source_target", + violation_error_message=_("Field source and target must be unique together."), + ) + ] + + source = models.ForeignKey('Instance', on_delete=models.CASCADE, help_text=_("The source instance of this peer link.")) + target = models.ForeignKey('ReceptorAddress', on_delete=models.CASCADE, help_text=_("The target receptor address of this peer link.")) class States(models.TextChoices): ADDING = 'adding', _('Adding') @@ -76,11 +88,6 @@ class States(models.TextChoices): choices=States.choices, default=States.ADDING, max_length=16, help_text=_("Indicates the current life cycle stage of this peer link.") ) - class Meta: - unique_together = ('source', 'target') - ordering = ("id",) - constraints = [models.CheckConstraint(check=~models.Q(source=models.F('target')), name='source_and_target_can_not_be_equal')] - class Instance(HasPolicyEditsMixin, BaseModel): """A model representing an AWX instance running against this database.""" @@ -110,6 +117,7 @@ def __str__(self): default="", max_length=50, ) + # Auto-fields, implementation is different from BaseModel created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) @@ -185,16 +193,9 @@ class States(models.TextChoices): node_state = models.CharField( choices=States.choices, default=States.READY, max_length=16, help_text=_("Indicates the current life cycle stage of this instance.") ) - listener_port = models.PositiveIntegerField( - blank=True, - null=True, - default=None, - validators=[MinValueValidator(1024), MaxValueValidator(65535)], - help_text=_("Port that Receptor will listen for incoming connections on."), - ) - peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target'), related_name='peers_from') - peers_from_control_nodes = models.BooleanField(default=False, help_text=_("If True, control plane cluster nodes should automatically peer to it.")) + managed = models.BooleanField(help_text=_("If True, this instance is managed by the control plane."), default=False, editable=False) + peers = models.ManyToManyField('ReceptorAddress', through=InstanceLink, through_fields=('source', 'target'), related_name='peers_from') POLICY_FIELDS = frozenset(('managed_by_policy', 'hostname', 'capacity_adjustment')) @@ -241,6 +242,26 @@ def health_check_pending(self): return True return self.health_check_started > self.last_health_check + @property + def canonical_address(self): + return self.receptor_addresses.filter(canonical=True).first() + + @property + def canonical_address_port(self): + # note: don't create a different query for receptor addresses, as this is prefetched on the View for optimization + for addr in self.receptor_addresses.all(): + if addr.canonical: + return addr.port + return None + + @property + def canonical_address_peers_from_control_nodes(self): + # note: don't create a different query for receptor addresses, as this is prefetched on the View for optimization + for addr in self.receptor_addresses.all(): + if addr.canonical: + return addr.peers_from_control_nodes + return False + def get_cleanup_task_kwargs(self, **kwargs): """ Produce options to use for the command: ansible-runner worker cleanup @@ -501,6 +522,35 @@ def schedule_write_receptor_config(broadcast=True): write_receptor_config() # just run locally +@receiver(post_save, sender=ReceptorAddress) +def receptor_address_saved(sender, instance, **kwargs): + from awx.main.signals import disable_activity_stream + + address = instance + + control_instances = set(Instance.objects.filter(node_type__in=[Instance.Types.CONTROL, Instance.Types.HYBRID])) + if address.peers_from_control_nodes: + # if control_instances is not a subset of current peers of address, then + # that means we need to add some InstanceLinks + if not control_instances <= set(address.peers_from.all()): + with disable_activity_stream(): + for control_instance in control_instances: + InstanceLink.objects.update_or_create(source=control_instance, target=address) + schedule_write_receptor_config() + else: + if address.peers_from.exists(): + with disable_activity_stream(): + address.peers_from.remove(*control_instances) + schedule_write_receptor_config() + + +@receiver(post_delete, sender=ReceptorAddress) +def receptor_address_deleted(sender, instance, **kwargs): + address = instance + if address.peers_from_control_nodes: + schedule_write_receptor_config() + + @receiver(post_save, sender=Instance) def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): ''' @@ -511,11 +561,14 @@ def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): 2. a node changes its value of peers_from_control_nodes 3. a new control node comes online and has instances to peer to ''' + from awx.main.signals import disable_activity_stream + if created and settings.IS_K8S and instance.node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]: - inst = Instance.objects.filter(peers_from_control_nodes=True) - if set(instance.peers.all()) != set(inst): - instance.peers.set(inst) - schedule_write_receptor_config(broadcast=False) + peers_addresses = ReceptorAddress.objects.filter(peers_from_control_nodes=True) + if peers_addresses.exists(): + with disable_activity_stream(): + instance.peers.add(*peers_addresses) + schedule_write_receptor_config(broadcast=False) if settings.IS_K8S and instance.node_type in [Instance.Types.HOP, Instance.Types.EXECUTION]: if instance.node_state == Instance.States.DEPROVISIONING: @@ -524,16 +577,6 @@ def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): # wait for jobs on the node to complete, then delete the # node and kick off write_receptor_config connection.on_commit(lambda: remove_deprovisioned_node.apply_async([instance.hostname])) - else: - control_instances = set(Instance.objects.filter(node_type__in=[Instance.Types.CONTROL, Instance.Types.HYBRID])) - if instance.peers_from_control_nodes: - if (control_instances & set(instance.peers_from.all())) != set(control_instances): - instance.peers_from.add(*control_instances) - schedule_write_receptor_config() # keep method separate to make pytest mocking easier - else: - if set(control_instances) & set(instance.peers_from.all()): - instance.peers_from.remove(*control_instances) - schedule_write_receptor_config() if created or instance.has_policy_changes(): schedule_policy_task() @@ -548,8 +591,6 @@ def on_instance_group_deleted(sender, instance, using, **kwargs): @receiver(post_delete, sender=Instance) def on_instance_deleted(sender, instance, using, **kwargs): schedule_policy_task() - if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION, Instance.Types.HOP) and instance.peers_from_control_nodes: - schedule_write_receptor_config() class UnifiedJobTemplateInstanceGroupMembership(models.Model): diff --git a/awx/main/models/receptor_address.py b/awx/main/models/receptor_address.py new file mode 100644 index 000000000000..1fa66bd647f5 --- /dev/null +++ b/awx/main/models/receptor_address.py @@ -0,0 +1,67 @@ +from django.db import models +from django.core.validators import MinValueValidator, MaxValueValidator +from django.utils.translation import gettext_lazy as _ +from awx.api.versioning import reverse + + +class Protocols(models.TextChoices): + TCP = 'tcp', 'TCP' + WS = 'ws', 'WS' + WSS = 'wss', 'WSS' + + +class ReceptorAddress(models.Model): + class Meta: + app_label = 'main' + constraints = [ + models.UniqueConstraint( + fields=["address"], + name="unique_receptor_address", + violation_error_message=_("Receptor address must be unique."), + ) + ] + + address = models.CharField(help_text=_("Routable address for this instance."), max_length=255) + port = models.IntegerField(help_text=_("Port for the address."), default=27199, validators=[MinValueValidator(0), MaxValueValidator(65535)]) + websocket_path = models.CharField(help_text=_("Websocket path."), max_length=255, default="", blank=True) + protocol = models.CharField( + help_text=_("Protocol to use for the Receptor listener, 'tcp', 'wss', or 'ws'."), max_length=10, default=Protocols.TCP, choices=Protocols.choices + ) + is_internal = models.BooleanField(help_text=_("If True, only routable within the Kubernetes cluster."), default=False) + canonical = models.BooleanField(help_text=_("If True, this address is the canonical address for the instance."), default=False) + peers_from_control_nodes = models.BooleanField(help_text=_("If True, control plane cluster nodes should automatically peer to it."), default=False) + instance = models.ForeignKey( + 'Instance', + related_name='receptor_addresses', + on_delete=models.CASCADE, + null=False, + ) + + def __str__(self): + return self.get_full_address() + + def get_full_address(self): + scheme = "" + path = "" + port = "" + if self.protocol == "ws": + scheme = "wss://" + + if self.protocol == "ws" and self.websocket_path: + path = f"/{self.websocket_path}" + + if self.port: + port = f":{self.port}" + + return f"{scheme}{self.address}{port}{path}" + + def get_peer_type(self): + if self.protocol == 'tcp': + return 'tcp-peer' + elif self.protocol in ['ws', 'wss']: + return 'ws-peer' + else: + return None + + def get_absolute_url(self, request=None): + return reverse('api:receptor_address_detail', kwargs={'pk': self.pk}, request=request) diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 7cd79c85e76a..597c900eeba7 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -27,7 +27,7 @@ ) from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER from awx.main.tasks.signals import signal_state, signal_callback, SignalExit -from awx.main.models import Instance, InstanceLink, UnifiedJob +from awx.main.models import Instance, InstanceLink, UnifiedJob, ReceptorAddress from awx.main.dispatch import get_task_queuename from awx.main.dispatch.publish import task from awx.main.utils.pglock import advisory_lock @@ -676,36 +676,44 @@ def kube_config(self): ) -def should_update_config(instances): +def should_update_config(new_config): ''' checks that the list of instances matches the list of tcp-peers in the config ''' + current_config = read_receptor_config() # this gets receptor conf lock - current_peers = [] for config_entry in current_config: - for key, value in config_entry.items(): - if key.endswith('-peer'): - current_peers.append(value['address']) - intended_peers = [f"{i.hostname}:{i.listener_port}" for i in instances] - logger.debug(f"Peers current {current_peers} intended {intended_peers}") - if set(current_peers) == set(intended_peers): - return False # config file is already update to date + if config_entry not in new_config: + logger.warning(f"{config_entry} should not be in receptor config. Updating.") + return True + for config_entry in new_config: + if config_entry not in current_config: + logger.warning(f"{config_entry} missing from receptor config. Updating.") + return True - return True + return False def generate_config_data(): # returns two values # receptor config - based on current database peers # should_update - If True, receptor_config differs from the receptor conf file on disk - instances = Instance.objects.filter(node_type__in=(Instance.Types.EXECUTION, Instance.Types.HOP), peers_from_control_nodes=True) + addresses = ReceptorAddress.objects.filter(peers_from_control_nodes=True) receptor_config = list(RECEPTOR_CONFIG_STARTER) - for instance in instances: - peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}} - receptor_config.append(peer) - should_update = should_update_config(instances) + for address in addresses: + if address.get_peer_type(): + peer = { + f'{address.get_peer_type()}': { + 'address': f'{address.get_full_address()}', + 'tls': 'tlsclient', + } + } + receptor_config.append(peer) + else: + logger.warning(f"Receptor address {address} has unsupported peer type, skipping.") + should_update = should_update_config(receptor_config) return receptor_config, should_update @@ -747,14 +755,13 @@ def write_receptor_config(): with lock: with open(__RECEPTOR_CONF, 'w') as file: yaml.dump(receptor_config, file, default_flow_style=False) - reload_receptor() @task(queue=get_task_queuename) def remove_deprovisioned_node(hostname): InstanceLink.objects.filter(source__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) - InstanceLink.objects.filter(target__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) + InstanceLink.objects.filter(target__instance__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) node_jobs = UnifiedJob.objects.filter( execution_node=hostname, diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index b32ae7b5e5ef..4614b7b4a4f1 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -495,7 +495,7 @@ def inspect_established_receptor_connections(mesh_status): update_links = [] for link in all_links: if link.link_state != InstanceLink.States.REMOVING: - if link.target.hostname in active_receptor_conns.get(link.source.hostname, {}): + if link.target.instance.hostname in active_receptor_conns.get(link.source.hostname, {}): if link.link_state is not InstanceLink.States.ESTABLISHED: link.link_state = InstanceLink.States.ESTABLISHED update_links.append(link) diff --git a/awx/main/tests/functional/api/test_instance_peers.py b/awx/main/tests/functional/api/test_instance_peers.py index 93af6bf5a2df..1ce6f843bd78 100644 --- a/awx/main/tests/functional/api/test_instance_peers.py +++ b/awx/main/tests/functional/api/test_instance_peers.py @@ -1,19 +1,16 @@ import pytest import yaml -import itertools from unittest import mock -from django.db.utils import IntegrityError - from awx.api.versioning import reverse -from awx.main.models import Instance +from awx.main.models import Instance, ReceptorAddress from awx.api.views.instance_install_bundle import generate_group_vars_all_yml def has_peer(group_vars, peer): peers = group_vars.get('receptor_peers', []) for p in peers: - if f"{p['host']}:{p['port']}" == peer: + if p['address'] == peer: return True return False @@ -24,119 +21,314 @@ class TestPeers: def configure_settings(self, settings): settings.IS_K8S = True - @pytest.mark.parametrize('node_type', ['control', 'hybrid']) - def test_prevent_peering_to_self(self, node_type): + @pytest.mark.parametrize('node_type', ['hop', 'execution']) + def test_peering_to_self(self, node_type, admin_user, patch): """ cannot peer to self """ - control_instance = Instance.objects.create(hostname='abc', node_type=node_type) - with pytest.raises(IntegrityError): - control_instance.peers.add(control_instance) + instance = Instance.objects.create(hostname='abc', node_type=node_type) + addr = ReceptorAddress.objects.create(instance=instance, address='abc', canonical=True) + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': instance.pk}), + data={"hostname": "abc", "node_type": node_type, "peers": [addr.id]}, + user=admin_user, + expect=400, + ) + assert 'Instance cannot peer to its own address.' in str(resp.data) @pytest.mark.parametrize('node_type', ['control', 'hybrid', 'hop', 'execution']) def test_creating_node(self, node_type, admin_user, post): """ can only add hop and execution nodes via API """ - post( + resp = post( url=reverse('api:instance_list'), data={"hostname": "abc", "node_type": node_type}, user=admin_user, expect=400 if node_type in ['control', 'hybrid'] else 201, ) + if resp.status_code == 400: + assert 'Can only create execution or hop nodes.' in str(resp.data) def test_changing_node_type(self, admin_user, patch): """ cannot change node type """ hop = Instance.objects.create(hostname='abc', node_type="hop") - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), data={"node_type": "execution"}, user=admin_user, expect=400, ) + assert 'Cannot change node type.' in str(resp.data) + + @pytest.mark.parametrize( + 'payload_port, payload_peers_from, initial_port, initial_peers_from', + [ + (-1, -1, None, None), + (-1, -1, 27199, False), + (-1, -1, 27199, True), + (None, -1, None, None), + (None, False, None, None), + (-1, False, None, None), + (27199, True, 27199, True), + (27199, False, 27199, False), + (27199, -1, 27199, True), + (27199, -1, 27199, False), + (-1, True, 27199, True), + (-1, False, 27199, False), + ], + ) + def test_no_op(self, payload_port, payload_peers_from, initial_port, initial_peers_from, admin_user, patch): + node = Instance.objects.create(hostname='abc', node_type='hop') + if initial_port is not None: + ReceptorAddress.objects.create(address=node.hostname, port=initial_port, canonical=True, peers_from_control_nodes=initial_peers_from, instance=node) + + assert ReceptorAddress.objects.filter(instance=node).count() == 1 + else: + assert ReceptorAddress.objects.filter(instance=node).count() == 0 + + data = {'enabled': True} # Just to have something to post. + if payload_port != -1: + data['listener_port'] = payload_port + if payload_peers_from != -1: + data['peers_from_control_nodes'] = payload_peers_from - @pytest.mark.parametrize('node_type', ['hop', 'execution']) - def test_listener_port_null(self, node_type, admin_user, post): - """ - listener_port can be None - """ - post( - url=reverse('api:instance_list'), - data={"hostname": "abc", "node_type": node_type, "listener_port": None}, + patch( + url=reverse('api:instance_detail', kwargs={'pk': node.pk}), + data=data, + user=admin_user, + expect=200, + ) + + assert ReceptorAddress.objects.filter(instance=node).count() == (0 if initial_port is None else 1) + if initial_port is not None: + ra = ReceptorAddress.objects.get(instance=node, canonical=True) + assert ra.port == initial_port + assert ra.peers_from_control_nodes == initial_peers_from + + @pytest.mark.parametrize( + 'payload_port, payload_peers_from', + [ + (27199, True), + (27199, False), + (27199, -1), + ], + ) + def test_creates_canonical_address(self, payload_port, payload_peers_from, admin_user, patch): + node = Instance.objects.create(hostname='abc', node_type='hop') + assert ReceptorAddress.objects.filter(instance=node).count() == 0 + + data = {'enabled': True} # Just to have something to post. + if payload_port != -1: + data['listener_port'] = payload_port + if payload_peers_from != -1: + data['peers_from_control_nodes'] = payload_peers_from + + patch( + url=reverse('api:instance_detail', kwargs={'pk': node.pk}), + data=data, + user=admin_user, + expect=200, + ) + + assert ReceptorAddress.objects.filter(instance=node).count() == 1 + ra = ReceptorAddress.objects.get(instance=node, canonical=True) + assert ra.port == payload_port + assert ra.peers_from_control_nodes == (payload_peers_from if payload_peers_from != -1 else False) + + @pytest.mark.parametrize( + 'payload_port, payload_peers_from, initial_port, initial_peers_from', + [ + (None, False, 27199, True), + (None, -1, 27199, True), + (None, False, 27199, False), + (None, -1, 27199, False), + ], + ) + def test_deletes_canonical_address(self, payload_port, payload_peers_from, initial_port, initial_peers_from, admin_user, patch): + node = Instance.objects.create(hostname='abc', node_type='hop') + ReceptorAddress.objects.create(address=node.hostname, port=initial_port, canonical=True, peers_from_control_nodes=initial_peers_from, instance=node) + + assert ReceptorAddress.objects.filter(instance=node).count() == 1 + + data = {'enabled': True} # Just to have something to post. + if payload_port != -1: + data['listener_port'] = payload_port + if payload_peers_from != -1: + data['peers_from_control_nodes'] = payload_peers_from + + patch( + url=reverse('api:instance_detail', kwargs={'pk': node.pk}), + data=data, user=admin_user, - expect=201, + expect=200, + ) + + assert ReceptorAddress.objects.filter(instance=node).count() == 0 + + @pytest.mark.parametrize( + 'payload_port, payload_peers_from, initial_port, initial_peers_from', + [ + (27199, True, 27199, False), + (27199, False, 27199, True), + (-1, True, 27199, False), + (-1, False, 27199, True), + ], + ) + def test_updates_canonical_address(self, payload_port, payload_peers_from, initial_port, initial_peers_from, admin_user, patch): + node = Instance.objects.create(hostname='abc', node_type='hop') + ReceptorAddress.objects.create(address=node.hostname, port=initial_port, canonical=True, peers_from_control_nodes=initial_peers_from, instance=node) + + assert ReceptorAddress.objects.filter(instance=node).count() == 1 + + data = {'enabled': True} # Just to have something to post. + if payload_port != -1: + data['listener_port'] = payload_port + if payload_peers_from != -1: + data['peers_from_control_nodes'] = payload_peers_from + + patch( + url=reverse('api:instance_detail', kwargs={'pk': node.pk}), + data=data, + user=admin_user, + expect=200, + ) + + assert ReceptorAddress.objects.filter(instance=node).count() == 1 + ra = ReceptorAddress.objects.get(instance=node, canonical=True) + assert ra.port == initial_port # At the present time, changing ports is not allowed + assert ra.peers_from_control_nodes == payload_peers_from + + @pytest.mark.parametrize( + 'payload_port, payload_peers_from, initial_port, initial_peers_from, error_msg', + [ + (-1, True, None, None, "Cannot enable peers_from_control_nodes"), + (None, True, None, None, "Cannot enable peers_from_control_nodes"), + (None, True, 21799, True, "Cannot enable peers_from_control_nodes"), + (None, True, 21799, False, "Cannot enable peers_from_control_nodes"), + (21800, -1, 21799, True, "Cannot change listener port"), + (21800, True, 21799, True, "Cannot change listener port"), + (21800, False, 21799, True, "Cannot change listener port"), + (21800, -1, 21799, False, "Cannot change listener port"), + (21800, True, 21799, False, "Cannot change listener port"), + (21800, False, 21799, False, "Cannot change listener port"), + ], + ) + def test_canonical_address_validation_error(self, payload_port, payload_peers_from, initial_port, initial_peers_from, error_msg, admin_user, patch): + node = Instance.objects.create(hostname='abc', node_type='hop') + if initial_port is not None: + ReceptorAddress.objects.create(address=node.hostname, port=initial_port, canonical=True, peers_from_control_nodes=initial_peers_from, instance=node) + + assert ReceptorAddress.objects.filter(instance=node).count() == 1 + else: + assert ReceptorAddress.objects.filter(instance=node).count() == 0 + + data = {'enabled': True} # Just to have something to post. + if payload_port != -1: + data['listener_port'] = payload_port + if payload_peers_from != -1: + data['peers_from_control_nodes'] = payload_peers_from + + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': node.pk}), + data=data, + user=admin_user, + expect=400, ) - @pytest.mark.parametrize('node_type, allowed', [('control', False), ('hybrid', False), ('hop', True), ('execution', True)]) - def test_peers_from_control_nodes_allowed(self, node_type, allowed, post, admin_user): + assert error_msg in str(resp.data) + + def test_changing_managed_listener_port(self, admin_user, patch): """ - only hop and execution nodes can have peers_from_control_nodes set to True + if instance is managed, cannot change listener port at all """ - post( - url=reverse('api:instance_list'), - data={"hostname": "abc", "peers_from_control_nodes": True, "node_type": node_type, "listener_port": 6789}, + hop = Instance.objects.create(hostname='abc', node_type="hop", managed=True) + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), + data={"listener_port": 5678}, + user=admin_user, + expect=400, # cannot set port + ) + assert 'Cannot change listener port for managed nodes.' in str(resp.data) + ReceptorAddress.objects.create(instance=hop, address='hop', port=27199, canonical=True) + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), + data={"listener_port": None}, user=admin_user, - expect=201 if allowed else 400, + expect=400, # cannot unset port ) + assert 'Cannot change listener port for managed nodes.' in str(resp.data) - def test_listener_port_is_required(self, admin_user, post): + def test_bidirectional_peering(self, admin_user, patch): """ - if adding instance to peers list, that instance must have listener_port set + cannot peer to node that is already to peered to it + if A -> B, then disallow B -> A """ - Instance.objects.create(hostname='abc', node_type="hop", listener_port=None) - post( - url=reverse('api:instance_list'), - data={"hostname": "ex", "peers_from_control_nodes": False, "node_type": "execution", "listener_port": None, "peers": ["abc"]}, + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1', canonical=True) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2', canonical=True) + hop1.peers.add(hop2addr) + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': hop2.pk}), + data={"peers": [hop1addr.id]}, user=admin_user, expect=400, ) + assert 'Instance hop1 is already peered to this instance.' in str(resp.data) - def test_peers_from_control_nodes_listener_port_enabled(self, admin_user, post): + def test_multiple_peers_same_instance(self, admin_user, patch): """ - if peers_from_control_nodes is True, listener_port must an integer - Assert that all other combinations are allowed + cannot peer to more than one address of the same instance """ - for index, item in enumerate(itertools.product(['hop', 'execution'], [True, False], [None, 6789])): - node_type, peers_from, listener_port = item - # only disallowed case is when peers_from is True and listener port is None - disallowed = peers_from and not listener_port - post( - url=reverse('api:instance_list'), - data={"hostname": f"abc{index}", "peers_from_control_nodes": peers_from, "node_type": node_type, "listener_port": listener_port}, - user=admin_user, - expect=400 if disallowed else 201, - ) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr1 = ReceptorAddress.objects.create(instance=hop1, address='hop1', canonical=True) + hop1addr2 = ReceptorAddress.objects.create(instance=hop1, address='hop1alternate') + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': hop2.pk}), + data={"peers": [hop1addr1.id, hop1addr2.id]}, + user=admin_user, + expect=400, + ) + assert 'Cannot peer to the same instance more than once.' in str(resp.data) @pytest.mark.parametrize('node_type', ['control', 'hybrid']) - def test_disallow_modifying_peers_control_nodes(self, node_type, admin_user, patch): + def test_changing_peers_control_nodes(self, node_type, admin_user, patch): """ for control nodes, peers field should not be modified directly via patch. """ - control = Instance.objects.create(hostname='abc', node_type=node_type) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', peers_from_control_nodes=True, listener_port=6789) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', peers_from_control_nodes=False, listener_port=6789) - assert [hop1] == list(control.peers.all()) # only hop1 should be peered - patch( + control = Instance.objects.create(hostname='abc', node_type=node_type, managed=True) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1', peers_from_control_nodes=True, canonical=True) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2', canonical=True) + assert [hop1addr] == list(control.peers.all()) # only hop1addr should be peered + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), - data={"peers": ["hop2"]}, + data={"peers": [hop2addr.id]}, user=admin_user, - expect=400, # cannot add peers directly + expect=400, # cannot add peers manually ) + assert 'Setting peers manually for managed nodes is not allowed.' in str(resp.data) + patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), - data={"peers": ["hop1"]}, + data={"peers": [hop1addr.id]}, user=admin_user, expect=200, # patching with current peers list should be okay ) - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), data={"peers": []}, user=admin_user, expect=400, # cannot remove peers directly ) + assert 'Setting peers manually for managed nodes is not allowed.' in str(resp.data) + patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), data={}, @@ -148,23 +340,25 @@ def test_disallow_modifying_peers_control_nodes(self, node_type, admin_user, pat url=reverse('api:instance_detail', kwargs={'pk': hop2.pk}), data={"peers_from_control_nodes": True}, user=admin_user, - expect=200, # patching without data should be fine too + expect=200, ) - assert {hop1, hop2} == set(control.peers.all()) # hop1 and hop2 should now be peered from control node + assert {hop1addr, hop2addr} == set(control.peers.all()) # hop1 and hop2 should now be peered from control node - def test_disallow_changing_hostname(self, admin_user, patch): + def test_changing_hostname(self, admin_user, patch): """ cannot change hostname """ hop = Instance.objects.create(hostname='hop', node_type='hop') - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), data={"hostname": "hop2"}, user=admin_user, expect=400, ) - def test_disallow_changing_node_state(self, admin_user, patch): + assert 'Cannot change hostname.' in str(resp.data) + + def test_changing_node_state(self, admin_user, patch): """ only allow setting to deprovisioning """ @@ -175,12 +369,54 @@ def test_disallow_changing_node_state(self, admin_user, patch): user=admin_user, expect=200, ) - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), data={"node_state": "ready"}, user=admin_user, expect=400, ) + assert "Can only change instances to the 'deprovisioning' state." in str(resp.data) + + def test_changing_managed_node_state(self, admin_user, patch): + """ + cannot change node state of managed node + """ + hop = Instance.objects.create(hostname='hop', node_type='hop', managed=True) + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), + data={"node_state": "deprovisioning"}, + user=admin_user, + expect=400, + ) + + assert 'Cannot deprovision managed nodes.' in str(resp.data) + + def test_changing_managed_peers_from_control_nodes(self, admin_user, patch): + """ + cannot change peers_from_control_nodes of managed node + """ + hop = Instance.objects.create(hostname='hop', node_type='hop', managed=True) + ReceptorAddress.objects.create(instance=hop, address='hop', peers_from_control_nodes=True, canonical=True) + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), + data={"peers_from_control_nodes": False}, + user=admin_user, + expect=400, + ) + + assert 'Cannot change peers_from_control_nodes for managed nodes.' in str(resp.data) + + hop.peers_from_control_nodes = False + hop.save() + + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), + data={"peers_from_control_nodes": False}, + user=admin_user, + expect=400, + ) + + assert 'Cannot change peers_from_control_nodes for managed nodes.' in str(resp.data) @pytest.mark.parametrize('node_type', ['control', 'hybrid']) def test_control_node_automatically_peers(self, node_type): @@ -191,9 +427,10 @@ def test_control_node_automatically_peers(self, node_type): peer to hop should be removed if hop is deleted """ - hop = Instance.objects.create(hostname='hop', node_type='hop', peers_from_control_nodes=True, listener_port=6789) + hop = Instance.objects.create(hostname='hop', node_type='hop') + hopaddr = ReceptorAddress.objects.create(instance=hop, address='hop', peers_from_control_nodes=True, canonical=True) control = Instance.objects.create(hostname='abc', node_type=node_type) - assert hop in control.peers.all() + assert hopaddr in control.peers.all() hop.delete() assert not control.peers.exists() @@ -203,26 +440,50 @@ def test_control_node_retains_other_peers(self, node_type): if a new node comes online, other peer relationships should remain intact """ - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=True) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', listener_port=6789, peers_from_control_nodes=False) - hop1.peers.add(hop2) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2', canonical=True) + hop1.peers.add(hop2addr) # a control node is added - Instance.objects.create(hostname='control', node_type=node_type, listener_port=None) + Instance.objects.create(hostname='control', node_type=node_type) assert hop1.peers.exists() - def test_group_vars(self, get, admin_user): + def test_reverse_peers(self, admin_user, get): + """ + if hop1 peers to hop2, hop1 should + be in hop2's reverse_peers list + """ + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2', canonical=True) + hop1.peers.add(hop2addr) + + resp = get( + url=reverse('api:instance_detail', kwargs={'pk': hop2.pk}), + user=admin_user, + expect=200, + ) + + assert hop1.pk in resp.data['reverse_peers'] + + def test_group_vars(self): """ control > hop1 > hop2 < execution """ - control = Instance.objects.create(hostname='control', node_type='control', listener_port=None) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=True) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', listener_port=6789, peers_from_control_nodes=False) - execution = Instance.objects.create(hostname='execution', node_type='execution', listener_port=6789) + control = Instance.objects.create(hostname='control', node_type='control') + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + ReceptorAddress.objects.create(instance=hop1, address='hop1', peers_from_control_nodes=True, port=6789, canonical=True) + + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2', peers_from_control_nodes=False, port=6789, canonical=True) + + execution = Instance.objects.create(hostname='execution', node_type='execution') + ReceptorAddress.objects.create(instance=execution, address='execution', peers_from_control_nodes=False, port=6789, canonical=True) - execution.peers.add(hop2) - hop1.peers.add(hop2) + execution.peers.add(hop2addr) + hop1.peers.add(hop2addr) control_vars = yaml.safe_load(generate_group_vars_all_yml(control)) hop1_vars = yaml.safe_load(generate_group_vars_all_yml(hop1)) @@ -265,13 +526,15 @@ def test_write_receptor_config_called(self): control = Instance.objects.create(hostname='control1', node_type='control') write_method.assert_not_called() - # new hop node with peers_from_control_nodes False (no) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=False) + # new address with peers_from_control_nodes False (no) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1', peers_from_control_nodes=False, canonical=True) hop1.delete() write_method.assert_not_called() - # new hop node with peers_from_control_nodes True (yes) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=True) + # new address with peers_from_control_nodes True (yes) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1', peers_from_control_nodes=True, canonical=True) write_method.assert_called() write_method.reset_mock() @@ -280,20 +543,21 @@ def test_write_receptor_config_called(self): write_method.assert_called() write_method.reset_mock() - # new hop node with peers_from_control_nodes False and peered to another hop node (no) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', listener_port=6789, peers_from_control_nodes=False) - hop2.peers.add(hop1) + # new address with peers_from_control_nodes False and peered to another hop node (no) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + ReceptorAddress.objects.create(instance=hop2, address='hop2', peers_from_control_nodes=False, canonical=True) + hop2.peers.add(hop1addr) hop2.delete() write_method.assert_not_called() # changing peers_from_control_nodes to False (yes) - hop1.peers_from_control_nodes = False - hop1.save() + hop1addr.peers_from_control_nodes = False + hop1addr.save() write_method.assert_called() write_method.reset_mock() - # deleting hop node that has peers_from_control_nodes to False (no) - hop1.delete() + # deleting address that has peers_from_control_nodes to False (no) + hop1.delete() # cascade deletes to hop1addr write_method.assert_not_called() # deleting control nodes (no) @@ -315,8 +579,8 @@ def test_write_receptor_config_data(self): # not peered, so config file should not be updated for i in range(3): - Instance.objects.create(hostname=f"exNo-{i}", node_type='execution', listener_port=6789, peers_from_control_nodes=False) - + inst = Instance.objects.create(hostname=f"exNo-{i}", node_type='execution') + ReceptorAddress.objects.create(instance=inst, address=f"exNo-{i}", port=6789, peers_from_control_nodes=False, canonical=True) _, should_update = generate_config_data() assert not should_update @@ -324,11 +588,13 @@ def test_write_receptor_config_data(self): expected_peers = [] for i in range(3): expected_peers.append(f"hop-{i}:6789") - Instance.objects.create(hostname=f"hop-{i}", node_type='hop', listener_port=6789, peers_from_control_nodes=True) + inst = Instance.objects.create(hostname=f"hop-{i}", node_type='hop') + ReceptorAddress.objects.create(instance=inst, address=f"hop-{i}", port=6789, peers_from_control_nodes=True, canonical=True) for i in range(3): expected_peers.append(f"exYes-{i}:6789") - Instance.objects.create(hostname=f"exYes-{i}", node_type='execution', listener_port=6789, peers_from_control_nodes=True) + inst = Instance.objects.create(hostname=f"exYes-{i}", node_type='execution') + ReceptorAddress.objects.create(instance=inst, address=f"exYes-{i}", port=6789, peers_from_control_nodes=True, canonical=True) new_config, should_update = generate_config_data() assert should_update diff --git a/awx/main/tests/functional/test_linkstate.py b/awx/main/tests/functional/test_linkstate.py new file mode 100644 index 000000000000..478883870a67 --- /dev/null +++ b/awx/main/tests/functional/test_linkstate.py @@ -0,0 +1,30 @@ +import pytest + +from awx.main.models import Instance, ReceptorAddress, InstanceLink +from awx.main.tasks.system import inspect_established_receptor_connections + + +@pytest.mark.django_db +class TestLinkState: + @pytest.fixture(autouse=True) + def configure_settings(self, settings): + settings.IS_K8S = True + + def test_inspect_established_receptor_connections(self): + ''' + Change link state from ADDING to ESTABLISHED + if the receptor status KnownConnectionCosts field + has an entry for the source and target node. + ''' + hop1 = Instance.objects.create(hostname='hop1') + hop2 = Instance.objects.create(hostname='hop2') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2', port=5678) + InstanceLink.objects.create(source=hop1, target=hop2addr, link_state=InstanceLink.States.ADDING) + + # calling with empty KnownConnectionCosts should not change the link state + inspect_established_receptor_connections({"KnownConnectionCosts": {}}) + assert InstanceLink.objects.get(source=hop1, target=hop2addr).link_state == InstanceLink.States.ADDING + + mesh_state = {"KnownConnectionCosts": {"hop1": {"hop2": 1}}} + inspect_established_receptor_connections(mesh_state) + assert InstanceLink.objects.get(source=hop1, target=hop2addr).link_state == InstanceLink.States.ESTABLISHED diff --git a/awx/main/tests/functional/test_migrations.py b/awx/main/tests/functional/test_migrations.py index cd0889c20851..ab877f603fd7 100644 --- a/awx/main/tests/functional/test_migrations.py +++ b/awx/main/tests/functional/test_migrations.py @@ -42,3 +42,29 @@ def test_happy_path(self, migrator): final_state = migrator.apply_tested_migration(final_migration) Instance = final_state.apps.get_model('main', 'Instance') assert Instance.objects.filter(hostname='foobar').count() == 1 + + def test_receptor_address(self, migrator): + old_state = migrator.apply_initial_migration(('main', '0188_add_bitbucket_dc_webhook')) + Instance = old_state.apps.get_model('main', 'Instance') + for i in range(3): + Instance.objects.create(hostname=f'foobar{i}', node_type='hop') + foo = Instance.objects.create(hostname='foo', node_type='execution', listener_port=1234) + bar = Instance.objects.create(hostname='bar', node_type='execution', listener_port=None) + bar.peers.add(foo) + + new_state = migrator.apply_tested_migration( + ('main', '0189_inbound_hop_nodes'), + ) + Instance = new_state.apps.get_model('main', 'Instance') + ReceptorAddress = new_state.apps.get_model('main', 'ReceptorAddress') + + # We can now test how our migration worked, new field is there: + assert ReceptorAddress.objects.filter(address='foo', port=1234).count() == 1 + assert not ReceptorAddress.objects.filter(address='bar').exists() + + bar = Instance.objects.get(hostname='bar') + fooaddr = ReceptorAddress.objects.get(address='foo') + + bar_peers = bar.peers.all() + assert len(bar_peers) == 1 + assert fooaddr in bar_peers diff --git a/awx/main/tests/unit/models/test_receptor_address.py b/awx/main/tests/unit/models/test_receptor_address.py new file mode 100644 index 000000000000..f18e1a9018a1 --- /dev/null +++ b/awx/main/tests/unit/models/test_receptor_address.py @@ -0,0 +1,32 @@ +from awx.main.models import ReceptorAddress +import pytest + +ReceptorAddress() + + +@pytest.mark.parametrize( + 'address, protocol, port, websocket_path, expected', + [ + ('foo', 'tcp', 27199, '', 'foo:27199'), + ('bar', 'ws', 6789, '', 'wss://bar:6789'), + ('mal', 'ws', 6789, 'path', 'wss://mal:6789/path'), + ('example.com', 'ws', 443, 'path', 'wss://example.com:443/path'), + ], +) +def test_get_full_address(address, protocol, port, websocket_path, expected): + receptor_address = ReceptorAddress(address=address, protocol=protocol, port=port, websocket_path=websocket_path) + assert receptor_address.get_full_address() == expected + + +@pytest.mark.parametrize( + 'protocol, expected', + [ + ('tcp', 'tcp-peer'), + ('ws', 'ws-peer'), + ('wss', 'ws-peer'), + ('foo', None), + ], +) +def test_get_peer_type(protocol, expected): + receptor_address = ReceptorAddress(protocol=protocol) + assert receptor_address.get_peer_type() == expected diff --git a/awx/ui/src/api/index.js b/awx/ui/src/api/index.js index 93631c2137fd..9c78db6e3657 100644 --- a/awx/ui/src/api/index.js +++ b/awx/ui/src/api/index.js @@ -29,6 +29,7 @@ import Notifications from './models/Notifications'; import Organizations from './models/Organizations'; import ProjectUpdates from './models/ProjectUpdates'; import Projects from './models/Projects'; +import ReceptorAddresses from './models/Receptor'; import Roles from './models/Roles'; import Root from './models/Root'; import Schedules from './models/Schedules'; @@ -79,6 +80,7 @@ const NotificationsAPI = new Notifications(); const OrganizationsAPI = new Organizations(); const ProjectUpdatesAPI = new ProjectUpdates(); const ProjectsAPI = new Projects(); +const ReceptorAPI = new ReceptorAddresses(); const RolesAPI = new Roles(); const RootAPI = new Root(); const SchedulesAPI = new Schedules(); @@ -130,6 +132,7 @@ export { OrganizationsAPI, ProjectUpdatesAPI, ProjectsAPI, + ReceptorAPI, RolesAPI, RootAPI, SchedulesAPI, diff --git a/awx/ui/src/api/models/Instances.js b/awx/ui/src/api/models/Instances.js index 388bb2eb4e17..7730a31df818 100644 --- a/awx/ui/src/api/models/Instances.js +++ b/awx/ui/src/api/models/Instances.js @@ -8,6 +8,7 @@ class Instances extends Base { this.readHealthCheckDetail = this.readHealthCheckDetail.bind(this); this.healthCheck = this.healthCheck.bind(this); this.readInstanceGroup = this.readInstanceGroup.bind(this); + this.readReceptorAddresses = this.readReceptorAddresses.bind(this); this.deprovisionInstance = this.deprovisionInstance.bind(this); } @@ -27,6 +28,17 @@ class Instances extends Base { return this.http.get(`${this.baseUrl}${instanceId}/instance_groups/`); } + readReceptorAddresses(instanceId) { + return this.http.get(`${this.baseUrl}${instanceId}/receptor_addresses/`); + } + + updateReceptorAddresses(instanceId, data) { + return this.http.post( + `${this.baseUrl}${instanceId}/receptor_addresses/`, + data + ); + } + deprovisionInstance(instanceId) { return this.http.patch(`${this.baseUrl}${instanceId}/`, { node_state: 'deprovisioning', diff --git a/awx/ui/src/api/models/Receptor.js b/awx/ui/src/api/models/Receptor.js new file mode 100644 index 000000000000..fd63d4cf74a6 --- /dev/null +++ b/awx/ui/src/api/models/Receptor.js @@ -0,0 +1,14 @@ +import Base from '../Base'; + +class ReceptorAddresses extends Base { + constructor(http) { + super(http); + this.baseUrl = 'api/v2/receptor_addresses/'; + } + + updateReceptorAddresses(instanceId, data) { + return this.http.post(`${this.baseUrl}`, data); + } +} + +export default ReceptorAddresses; diff --git a/awx/ui/src/screens/Instances/Instance.js b/awx/ui/src/screens/Instances/Instance.js index 6d0d1e800469..59350d9c519e 100644 --- a/awx/ui/src/screens/Instances/Instance.js +++ b/awx/ui/src/screens/Instances/Instance.js @@ -12,6 +12,7 @@ import { SettingsAPI } from 'api'; import ContentLoading from 'components/ContentLoading'; import InstanceDetail from './InstanceDetail'; import InstancePeerList from './InstancePeers'; +import InstanceListenerAddressList from './InstanceListenerAddressList'; function Instance({ setBreadcrumb }) { const { me } = useConfig(); @@ -54,7 +55,12 @@ function Instance({ setBreadcrumb }) { }, [request]); if (isK8s) { - tabsArray.push({ name: t`Peers`, link: `${match.url}/peers`, id: 1 }); + tabsArray.push({ + name: t`Listener Addresses`, + link: `${match.url}/listener_addresses`, + id: 1, + }); + tabsArray.push({ name: t`Peers`, link: `${match.url}/peers`, id: 2 }); } if (isLoading) { return ; @@ -72,6 +78,14 @@ function Instance({ setBreadcrumb }) { + {isK8s && ( + + + + )} {isK8s && ( diff --git a/awx/ui/src/screens/Instances/InstanceAdd/InstanceAdd.js b/awx/ui/src/screens/Instances/InstanceAdd/InstanceAdd.js index 1c0e86400dcb..e9b33e133893 100644 --- a/awx/ui/src/screens/Instances/InstanceAdd/InstanceAdd.js +++ b/awx/ui/src/screens/Instances/InstanceAdd/InstanceAdd.js @@ -9,6 +9,10 @@ function InstanceAdd() { const [formError, setFormError] = useState(); const handleSubmit = async (values) => { try { + if (values.listener_port === undefined) { + values.listener_port = null; + } + const { data: { id }, } = await InstancesAPI.create(values); diff --git a/awx/ui/src/screens/Instances/InstanceAdd/InstanceAdd.test.js b/awx/ui/src/screens/Instances/InstanceAdd/InstanceAdd.test.js index 1c4d8d1d1c68..ac667c37adfd 100644 --- a/awx/ui/src/screens/Instances/InstanceAdd/InstanceAdd.test.js +++ b/awx/ui/src/screens/Instances/InstanceAdd/InstanceAdd.test.js @@ -36,6 +36,7 @@ describe('', () => { }); }); expect(InstancesAPI.create).toHaveBeenCalledWith({ + listener_port: null, // injected if listener_port is not set node_type: 'hop', }); expect(history.location.pathname).toBe('/instances/13/details'); diff --git a/awx/ui/src/screens/Instances/InstanceDetail/InstanceDetail.js b/awx/ui/src/screens/Instances/InstanceDetail/InstanceDetail.js index 8e60ff5d6828..e1882d3a7563 100644 --- a/awx/ui/src/screens/Instances/InstanceDetail/InstanceDetail.js +++ b/awx/ui/src/screens/Instances/InstanceDetail/InstanceDetail.js @@ -183,6 +183,7 @@ function InstanceDetail({ setBreadcrumb, isK8s }) { } const isHopNode = instance.node_type === 'hop'; const isExecutionNode = instance.node_type === 'execution'; + const isManaged = instance.managed; return ( <> @@ -208,7 +209,7 @@ function InstanceDetail({ setBreadcrumb, isK8s }) { - {(isExecutionNode || isHopNode) && ( + {(isExecutionNode || isHopNode || !isManaged) && ( <> {instance.related?.install_bundle && ( @@ -338,19 +341,21 @@ function InstanceDetail({ setBreadcrumb, isK8s }) { )} - {config?.me?.is_superuser && isK8s && (isExecutionNode || isHopNode) && ( - - )} {config?.me?.is_superuser && isK8s && - (isExecutionNode || isHopNode) && ( + (isExecutionNode || isHopNode || !isManaged) && ( + + )} + {config?.me?.is_superuser && + isK8s && + (isExecutionNode || isHopNode || !isManaged) && (