From 43fae6f055ca20143d60e6fa9b6b5d71de13ad8c Mon Sep 17 00:00:00 2001 From: Iryna Shustava Date: Tue, 1 Jun 2021 08:53:06 -0600 Subject: [PATCH] Don't register duplicate services from different k8s namespaces When Consul namespaces are not enabled and we are processing a service that already exists in Consul but with a different k8s namespace meta, we skip service registration to avoid service name collisions. --- connect-inject/endpoints_controller.go | 173 ++++++++++++-------- connect-inject/endpoints_controller_test.go | 106 +++++++++++- 2 files changed, 202 insertions(+), 77 deletions(-) diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index 62f72d7ee3..ac2182734a 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -147,77 +147,10 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( for address, healthStatus := range allAddresses { if address.TargetRef != nil && address.TargetRef.Kind == "Pod" { - // Get pod associated with this address. - var pod corev1.Pod - objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace} - if err = r.Client.Get(ctx, objectKey, &pod); err != nil { - r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name) + if err := r.registerServicesAndHealthCheck(ctx, serviceEndpoints, address, healthStatus, endpointAddressMap); err != nil { + r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } - podHostIP := pod.Status.HostIP - - if hasBeenInjected(pod) { - // Build the endpointAddressMap up for deregistering service instances later. - endpointAddressMap[pod.Status.PodIP] = true - // Create client for Consul agent local to the pod. - client, err := r.remoteConsulClient(podHostIP, r.consulNamespace(pod.Namespace)) - if err != nil { - r.Log.Error(err, "failed to create a new Consul client", "address", podHostIP) - errs = multierror.Append(errs, err) - } - - var managedByEndpointsController bool - if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue { - managedByEndpointsController = true - } - // For pods managed by this controller, create and register the service instance. - if managedByEndpointsController { - // Get information from the pod to create service instance registrations. - serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus) - if err != nil { - r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) - errs = multierror.Append(errs, err) - } - - // Register the service instance with the local agent. - // Note: the order of how we register services is important, - // and the connect-proxy service should come after the "main" service - // because its alias health check depends on the main service existing. - r.Log.Info("registering service with Consul", "name", serviceRegistration.Name, - "id", serviceRegistration.ID, "agentIP", podHostIP) - err = client.Agent().ServiceRegister(serviceRegistration) - if err != nil { - r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name) - errs = multierror.Append(errs, err) - } - - // Register the proxy service instance with the local agent. - r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name) - err = client.Agent().ServiceRegister(proxyServiceRegistration) - if err != nil { - r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name) - errs = multierror.Append(errs, err) - } - } - - // Update the service TTL health check for both legacy services and services managed by endpoints - // controller. The proxy health checks are registered separately by endpoints controller and - // lifecycle sidecar for legacy services. Here, we always update the health check for legacy and - // newer services idempotently since the service health check is not added as part of the service - // registration. - reason := getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace) - serviceName := getServiceName(pod, serviceEndpoints) - r.Log.Info("updating health check status for service", "name", serviceName, "reason", reason, "status", healthStatus) - serviceID := getServiceID(pod, serviceEndpoints) - proxyServiceName := getProxyServiceName(pod, serviceEndpoints) - proxyServiceID := getProxyServiceID(pod, serviceEndpoints) - healthCheckID := getConsulHealthCheckID(pod, serviceID) - err = r.upsertHealthCheck(pod, client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, healthStatus) - if err != nil { - r.Log.Error(err, "failed to update health check status for service", "name", serviceName) - errs = multierror.Append(errs, err) - } - } } } } @@ -247,6 +180,103 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error { ).Complete(r) } +// registerServicesAndHealthCheck create Consul registrations for the service and proxy and register them with Consul. +// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready. +func (r *EndpointsController) registerServicesAndHealthCheck(ctx context.Context, serviceEndpoints corev1.Endpoints, address corev1.EndpointAddress, healthStatus string, endpointAddressMap map[string]bool) error { + // Get pod associated with this address. + var pod corev1.Pod + objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace} + if err := r.Client.Get(ctx, objectKey, &pod); err != nil { + r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name) + return err + } + podHostIP := pod.Status.HostIP + + if hasBeenInjected(pod) { + // Build the endpointAddressMap up for deregistering service instances later. + endpointAddressMap[pod.Status.PodIP] = true + // Create client for Consul agent local to the pod. + client, err := r.remoteConsulClient(podHostIP, r.consulNamespace(pod.Namespace)) + if err != nil { + r.Log.Error(err, "failed to create a new Consul client", "address", podHostIP) + return err + } + + var managedByEndpointsController bool + if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue { + managedByEndpointsController = true + } + // For pods managed by this controller, create and register the service instance. + if managedByEndpointsController { + // Get information from the pod to create service instance registrations. + serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints) + if err != nil { + r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) + return err + } + + // When Consul namespaces are not enabled, we check that the service with the same name but in a different namespace + // is already registered with Consul, and if it is, we skip the registration to avoid service name collisions. + if !r.EnableConsulNamespaces { + services, _, err := client.Catalog().Service(serviceRegistration.Name, "", nil) + if err != nil { + r.Log.Error(err, "failed to get service from the Consul catalog", "name", serviceRegistration.Name) + return err + } + for _, service := range services { + if service.ServiceMeta[MetaKeyKubeNS] != serviceEndpoints.Namespace { + // Log but don't return an error because we don't want to reconcile this endpoints object again. + r.Log.Info("Skipping service registration because a service with the same name "+ + "but a different Kubernetes namespace is already registered with Consul", + "name", serviceRegistration.Name, + MetaKeyKubeNS, serviceEndpoints.Namespace, + "existing-k8s-namespace", service.ServiceMeta[MetaKeyKubeNS]) + return nil + } + } + } + + // Register the service instance with the local agent. + // Note: the order of how we register services is important, + // and the connect-proxy service should come after the "main" service + // because its alias health check depends on the main service existing. + r.Log.Info("registering service with Consul", "name", serviceRegistration.Name, + "id", serviceRegistration.ID, "agentIP", podHostIP) + err = client.Agent().ServiceRegister(serviceRegistration) + if err != nil { + r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name) + return err + } + + // Register the proxy service instance with the local agent. + r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name) + err = client.Agent().ServiceRegister(proxyServiceRegistration) + if err != nil { + r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name) + return err + } + } + + // Update the service TTL health check for both legacy services and services managed by endpoints + // controller. The proxy health checks are registered separately by endpoints controller and + // lifecycle sidecar for legacy services. Here, we always update the health check for legacy and + // newer services idempotently since the service health check is not added as part of the service + // registration. + reason := getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace) + serviceName := getServiceName(pod, serviceEndpoints) + r.Log.Info("updating health check status for service", "name", serviceName, "reason", reason, "status", healthStatus) + serviceID := getServiceID(pod, serviceEndpoints) + healthCheckID := getConsulHealthCheckID(pod, serviceID) + err = r.upsertHealthCheck(pod, client, serviceID, healthCheckID, healthStatus) + if err != nil { + r.Log.Error(err, "failed to update health check status for service", "name", serviceName) + return err + } + } + + return nil +} + // getServiceCheck will return the health check for this pod and service if it exists. func getServiceCheck(client *api.Client, healthCheckID string) (*api.AgentCheck, error) { filter := fmt.Sprintf("CheckID == `%s`", healthCheckID) @@ -300,7 +330,7 @@ func (r *EndpointsController) updateConsulHealthCheckStatus(client *api.Client, // upsertHealthCheck checks if the healthcheck exists for the service, and creates it if it doesn't exist, or updates it // if it does. -func (r *EndpointsController) upsertHealthCheck(pod corev1.Pod, client *api.Client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, status string) error { +func (r *EndpointsController) upsertHealthCheck(pod corev1.Pod, client *api.Client, serviceID, healthCheckID, status string) error { reason := getHealthCheckStatusReason(status, pod.Name, pod.Namespace) // Retrieve the health check that would exist if the service had one registered for this pod. serviceCheck, err := getServiceCheck(client, healthCheckID) @@ -335,7 +365,6 @@ func getServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string { serviceName = serviceNameFromAnnotation } return serviceName - } func getServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string { @@ -354,7 +383,7 @@ func getProxyServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string // createServiceRegistrations creates the service and proxy service instance registrations with the information from the // Pod. -func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) { +func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) { // If a port is specified, then we determine the value of that port // and register that port for the host service. // The handler will always set the port annotation if one is not provided on the pod. diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index d642ad541f..9b787a7113 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -424,7 +424,7 @@ func TestProcessUpstreams(t *testing.T) { } for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - // Create test consul server + // Create test consul server. consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { c.NodeName = nodeName }) @@ -950,7 +950,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { addr := strings.Split(consul.HTTPAddr, ":") consulPort := addr[1] - // Register service and proxy in consul + // Register service and proxy in consul. for _, svc := range tt.initialConsulSvcs { err = consulClient.Agent().ServiceRegister(svc) require.NoError(t, err) @@ -1405,6 +1405,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "service-updated", Port: 80, Address: "1.2.3.4", + Meta: map[string]string{MetaKeyKubeNS: "default"}, Check: &api.AgentServiceCheck{ CheckID: "default/pod1-service-updated/kubernetes-health-check", Name: "Kubernetes Health Check", @@ -1420,6 +1421,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "service-updated-sidecar-proxy", Port: 20000, Address: "1.2.3.4", + Meta: map[string]string{MetaKeyKubeNS: "default"}, Proxy: &api.AgentServiceConnectProxyConfig{ DestinationServiceName: "service-updated", DestinationServiceID: "pod1-service-updated", @@ -1486,6 +1488,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "service-updated", Port: 80, Address: "1.2.3.4", + Meta: map[string]string{MetaKeyKubeNS: "default"}, Check: &api.AgentServiceCheck{ CheckID: "default/pod1-service-updated/kubernetes-health-check", Name: "Kubernetes Health Check", @@ -1501,6 +1504,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "service-updated-sidecar-proxy", Port: 20000, Address: "1.2.3.4", + Meta: map[string]string{MetaKeyKubeNS: "default"}, Proxy: &api.AgentServiceConnectProxyConfig{ DestinationServiceName: "service-updated", DestinationServiceID: "pod1-service-updated", @@ -1567,6 +1571,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "service-updated", Port: 80, Address: "1.2.3.4", + Meta: map[string]string{MetaKeyKubeNS: "default"}, }, { Kind: api.ServiceKindConnectProxy, @@ -1574,6 +1579,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "service-updated-sidecar-proxy", Port: 20000, Address: "1.2.3.4", + Meta: map[string]string{MetaKeyKubeNS: "default"}, Proxy: &api.AgentServiceConnectProxyConfig{ DestinationServiceName: "service-updated", DestinationServiceID: "pod1-service-updated", @@ -1630,7 +1636,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "different-consul-svc-name", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{MetaKeyManagedBy: managedByValue}, + Meta: map[string]string{MetaKeyManagedBy: managedByValue, MetaKeyKubeNS: "default"}, }, { Kind: api.ServiceKindConnectProxy, @@ -3051,7 +3057,7 @@ func TestServiceInstancesForK8SServiceNameAndNamespace(t *testing.T) { } } -func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t *testing.T) { +func TestCreateServiceRegistrations_withTransparentProxy(t *testing.T) { t.Parallel() const serviceName = "test-service" @@ -3890,7 +3896,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * Log: logrtest.TestLogger{T: t}, } - serviceRegistration, proxyServiceRegistration, err := epCtrl.createServiceRegistrations(*pod, *endpoints, api.HealthPassing) + serviceRegistration, proxyServiceRegistration, err := epCtrl.createServiceRegistrations(*pod, *endpoints) if c.expErr != "" { require.EqualError(t, err, c.expErr) } else { @@ -3905,6 +3911,96 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * } } +func TestRegisterServicesAndHealthCheck_errorsWhenDuplicateServiceFound(t *testing.T) { + t.Parallel() + + cases := map[string]struct { + consulServiceMeta map[string]string + }{ + "different k8s namespace meta": { + consulServiceMeta: map[string]string{MetaKeyKubeNS: "some-other-ns"}, + }, + "no k8s namespace meta": { + consulServiceMeta: nil, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + nodeName := "test-node" + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.NodeName = nodeName + }) + require.NoError(t, err) + defer consul.Stop() + + consul.WaitForServiceIntentions(t) + httpAddr := consul.HTTPAddr + clientConfig := &api.Config{ + Address: httpAddr, + } + consulClient, err := api.NewClient(clientConfig) + require.NoError(t, err) + addr := strings.Split(httpAddr, ":") + consulPort := addr[1] + + existingService := &api.AgentServiceRegistration{ + ID: "test-service", + Name: "test-service", + Port: 1234, + Address: "1.2.3.4", + Meta: c.consulServiceMeta, + } + err = consulClient.Agent().ServiceRegister(existingService) + require.NoError(t, err) + pod := createPod("test-pod", "1.1.1.1", true, true) + + endpointsAddress := corev1.EndpointAddress{ + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: pod.Name, + Namespace: pod.Namespace, + }, + } + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{endpointsAddress}, + }, + }, + } + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(ns, pod, endpoints).Build() + + ep := &EndpointsController{ + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: "http", + ConsulClientCfg: clientConfig, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + Client: fakeClient, + } + + err = ep.registerServicesAndHealthCheck(context.Background(), *endpoints, endpointsAddress, api.HealthPassing, make(map[string]bool)) + + // Check that the service is not registered with Consul. + _, _, err = consulClient.Agent().Service("test-pod-test-service", nil) + require.EqualError(t, err, "Unexpected response code: 404 (unknown service ID: test-pod-test-service)") + + _, _, err = consulClient.Agent().Service("test-pod-test-service-sidecar-proxy", nil) + require.EqualError(t, err, "Unexpected response code: 404 (unknown service ID: test-pod-test-service-sidecar-proxy)") + }) + } +} + func createPod(name, ip string, inject bool, managedByEndpointsController bool) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{