From 75b5d6aa03711ee424d806ade8a58b217edb8947 Mon Sep 17 00:00:00 2001 From: Nitya Dhanushkodi Date: Mon, 10 May 2021 12:17:25 -0700 Subject: [PATCH] Support upgrades for connect refactor (#509) Before the connect refactor, service registration in Consul was managed by the lifecycle sidecar, which would re-register the service with Consul every 10s. Now, service registration is managed by Endpoints controller. In order to support upgrades to the refactored Endpoints controller, we need Endpoints controller to NOT register or deregister any services managed by lifecycle sidecar. To do this, the annotation consul.hashicorp.com/connect-inject-managed-by is added to pods managed by endpoints controller, so endpoints controller will ignore older services managed by lifecycle sidecar (legacy services) for service registration/deregistration, and only create and register new services that are supposed to be managed by endpoints controller. To support health checks for legacy services, the Endpoints controller will always update the healthcheck for any Connect service, whether it's managed by Endpoints controller or not. The service registration no longer happens at the same time as its health check registration. The health check is registered separately for all services, legacy or not. --- CHANGELOG.md | 3 + connect-inject/annotations.go | 9 + connect-inject/endpoints_controller.go | 198 ++++++-- .../endpoints_controller_ent_test.go | 176 +++++-- connect-inject/endpoints_controller_test.go | 465 +++++++++++++++--- connect-inject/handler.go | 4 + connect-inject/handler_test.go | 8 + go.sum | 2 - 8 files changed, 727 insertions(+), 138 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4d02c76fd..a3b23c1135 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,9 @@ IMPROVEMENTS: * Setting the label `consul.hashicorp.com/transparent-proxy` to `true/false` on a namespace will define the default behavior for pods in that namespace, which do not also have the annotation set. * The default tproxy behavior will be defined by the value of `-enable-transparent-proxy` flag to the `consul-k8s inject-connect` command. It can be overridden in a namespace by the the label on the namespace or for a pod using the annotation on the pod. +* Connect: support upgrades for services deployed before endpoints controller to + upgrade to a version of consul-k8s with endpoints controller. [[GH-509](https://github.com/hashicorp/consul-k8s/pull/509)] + BUG FIXES: * Connect: Use `runAsNonRoot: false` for connect-init's container when tproxy is enabled. [[GH-493](https://github.com/hashicorp/consul-k8s/pull/493)] * CRDs: Fix a bug where the `config` field in `ProxyDefaults` CR was not synced to Consul because diff --git a/connect-inject/annotations.go b/connect-inject/annotations.go index 32bbc7b75a..d6ddbecc03 100644 --- a/connect-inject/annotations.go +++ b/connect-inject/annotations.go @@ -5,6 +5,12 @@ const ( // a pod after an injection is done. keyInjectStatus = "consul.hashicorp.com/connect-inject-status" + // keyManagedBy is the key of the label that is added to pods managed + // by the Endpoints controller. This is to support upgrading from consul-k8s + // without Endpoints controller to consul-k8s with Endpoints controller + // without disrupting services managed the old way. + keyManagedBy = "consul.hashicorp.com/connect-inject-managed-by" + // annotationInject is the key of the annotation that controls whether // injection is explicitly enabled or disabled for a pod. This should // be set to a truthy or falsy value, as parseable by strconv.ParseBool @@ -104,6 +110,9 @@ const ( // injected is used as the annotation value for annotationInjected. injected = "injected" + + // endpointsController is the value for keyManagedBy. + managedByValue = "consul-k8s-endpoints-controller" ) // Annotations used by Prometheus. diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index 33598925b2..1e6616988f 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -31,6 +31,7 @@ const ( MetaKeyPodName = "pod-name" MetaKeyKubeServiceName = "k8s-service-name" MetaKeyKubeNS = "k8s-namespace" + MetaKeyManagedBy = "managed-by" kubernetesSuccessReasonMsg = "Kubernetes health checks passing" envoyPrometheusBindAddr = "envoy_prometheus_bind_addr" @@ -150,39 +151,55 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } - // Get information from the pod to create service instance registrations. - serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus) - if err != nil { - r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) - return ctrl.Result{}, err + var managedByEndpointsController bool + if raw, ok := pod.Labels[keyManagedBy]; ok && raw == managedByValue { + managedByEndpointsController = true } - // Register the service instance with the local agent. - // Note: the order of how we register services is important, - // and the connect-proxy service should come after the "main" service - // because its alias health check depends on the main service existing. - r.Log.Info("registering service with Consul", "name", serviceRegistration.Name) - err = client.Agent().ServiceRegister(serviceRegistration) - if err != nil { - r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name) - return ctrl.Result{}, err - } + // For pods managed by this controller, create and register the service instance. + if managedByEndpointsController { + // Get information from the pod to create service instance registrations. + serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints, healthStatus) + if err != nil { + r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) + return ctrl.Result{}, err + } - // Register the proxy service instance with the local agent. - r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name) - err = client.Agent().ServiceRegister(proxyServiceRegistration) - if err != nil { - r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name) - return ctrl.Result{}, err + // Register the service instance with the local agent. + // Note: the order of how we register services is important, + // and the connect-proxy service should come after the "main" service + // because its alias health check depends on the main service existing. + r.Log.Info("registering service with Consul", "name", serviceRegistration.Name) + err = client.Agent().ServiceRegister(serviceRegistration) + if err != nil { + r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name) + return ctrl.Result{}, err + } + + // Register the proxy service instance with the local agent. + r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name) + err = client.Agent().ServiceRegister(proxyServiceRegistration) + if err != nil { + r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name) + return ctrl.Result{}, err + } } - // Update the TTL health check for the service. - // This is required because ServiceRegister() does not update the TTL if the service already exists. + // Update the service TTL health check for both legacy services and services managed by endpoints + // controller. The proxy health checks are registered separately by endpoints controller and + // lifecycle sidecar for legacy services. Here, we always update the health check for legacy and + // newer services idempotently since the service health check is not added as part of the service + // registration. reason := getHealthCheckStatusReason(healthStatus, pod.Name, pod.Namespace) - r.Log.Info("updating health check status for service", "name", serviceRegistration.Name, "reason", reason, "status", healthStatus) - err = client.Agent().UpdateTTL(getConsulHealthCheckID(pod, serviceRegistration.ID), reason, healthStatus) + serviceName := getServiceName(pod, serviceEndpoints) + r.Log.Info("updating health check status for service", "name", serviceName, "reason", reason, "status", healthStatus) + serviceID := getServiceID(pod, serviceEndpoints) + proxyServiceName := getProxyServiceName(pod, serviceEndpoints) + proxyServiceID := getProxyServiceID(pod, serviceEndpoints) + healthCheckID := getConsulHealthCheckID(pod, serviceID) + err = r.upsertHealthCheck(pod, client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, healthStatus) if err != nil { - r.Log.Error(err, "failed to update health check status for service", "name", serviceRegistration.Name) + r.Log.Error(err, "failed to update health check status for service", "name", serviceName) return ctrl.Result{}, err } } @@ -215,6 +232,111 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error { ).Complete(r) } +// getServiceCheck will return the health check for this pod and service if it exists. +func getServiceCheck(client *api.Client, healthCheckID string) (*api.AgentCheck, error) { + filter := fmt.Sprintf("CheckID == `%s`", healthCheckID) + checks, err := client.Agent().ChecksWithFilter(filter) + if err != nil { + return nil, err + } + // This will be nil (does not exist) or an actual check. + return checks[healthCheckID], nil +} + +// registerConsulHealthCheck registers a TTL health check for the service on this Agent local to the Pod. This will add +// the Pod's readiness status, which will mark the service instance healthy/unhealthy for Consul service mesh +// traffic. +func registerConsulHealthCheck(client *api.Client, consulHealthCheckID, serviceID, status string) error { + // Create a TTL health check in Consul associated with this service and pod. + // The TTL time is 100000h which should ensure that the check never fails due to timeout + // of the TTL check. + err := client.Agent().CheckRegister(&api.AgentCheckRegistration{ + ID: consulHealthCheckID, + Name: "Kubernetes Health Check", + ServiceID: serviceID, + AgentServiceCheck: api.AgentServiceCheck{ + TTL: "100000h", + Status: status, + SuccessBeforePassing: 1, + FailuresBeforeCritical: 1, + }, + }) + if err != nil { + // Full error looks like: + // Unexpected response code: 500 (ServiceID "consulnamespace/svc-id" does not exist) + if strings.Contains(err.Error(), fmt.Sprintf("%s\" does not exist", serviceID)) { + return fmt.Errorf("service %q not found in Consul: unable to register health check", serviceID) + } + return fmt.Errorf("registering health check for service %q: %w", serviceID, err) + } + + return nil +} + +// updateConsulHealthCheckStatus updates the consul health check status. +func (r *EndpointsController) updateConsulHealthCheckStatus(client *api.Client, consulHealthCheckID, status, reason string) error { + r.Log.Info("updating health check", "id", consulHealthCheckID) + err := client.Agent().UpdateTTL(consulHealthCheckID, reason, status) + if err != nil { + return fmt.Errorf("error updating health check: %w", err) + } + return nil +} + +// upsertHealthCheck checks if the healthcheck exists for the service, and creates it if it doesn't exist, or updates it +// if it does. +func (r *EndpointsController) upsertHealthCheck(pod corev1.Pod, client *api.Client, serviceID, proxyServiceID, proxyServiceName, healthCheckID, status string) error { + reason := getHealthCheckStatusReason(status, pod.Name, pod.Namespace) + // Retrieve the health check that would exist if the service had one registered for this pod. + serviceCheck, err := getServiceCheck(client, healthCheckID) + if err != nil { + return fmt.Errorf("unable to get agent health checks: serviceID=%s, checkID=%s, %s", serviceID, healthCheckID, err) + } + if serviceCheck == nil { + // Create a new health check. + err = registerConsulHealthCheck(client, healthCheckID, serviceID, status) + if err != nil { + return err + } + + // Also update it, the reason this is separate is there is no way to set the Output field of the health check + // at creation time, and this is what is displayed on the UI as opposed to the Notes field. + err = r.updateConsulHealthCheckStatus(client, healthCheckID, status, reason) + if err != nil { + return err + } + } else if serviceCheck.Status != status { + err = r.updateConsulHealthCheckStatus(client, healthCheckID, status, reason) + if err != nil { + return err + } + } + return nil +} + +func getServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string { + serviceName := serviceEndpoints.Name + if serviceNameFromAnnotation, ok := pod.Annotations[annotationService]; ok && serviceNameFromAnnotation != "" { + serviceName = serviceNameFromAnnotation + } + return serviceName + +} + +func getServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string { + return fmt.Sprintf("%s-%s", pod.Name, getServiceName(pod, serviceEndpoints)) +} + +func getProxyServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string { + serviceName := getServiceName(pod, serviceEndpoints) + return fmt.Sprintf("%s-sidecar-proxy", serviceName) +} + +func getProxyServiceID(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string { + proxyServiceName := getProxyServiceName(pod, serviceEndpoints) + return fmt.Sprintf("%s-%s", pod.Name, proxyServiceName) +} + // createServiceRegistrations creates the service and proxy service instance registrations with the information from the // Pod. func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) (*api.AgentServiceRegistration, *api.AgentServiceRegistration, error) { @@ -235,17 +357,15 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service // Otherwise, the Consul service name should equal the Kubernetes Service name. // The service name in Consul defaults to the Endpoints object name, and is overridden by the pod // annotation consul.hashicorp.com/connect-service.. - serviceName := serviceEndpoints.Name - if serviceNameFromAnnotation, ok := pod.Annotations[annotationService]; ok && serviceNameFromAnnotation != "" { - serviceName = serviceNameFromAnnotation - } + serviceName := getServiceName(pod, serviceEndpoints) - serviceID := fmt.Sprintf("%s-%s", pod.Name, serviceName) + serviceID := getServiceID(pod, serviceEndpoints) meta := map[string]string{ MetaKeyPodName: pod.Name, MetaKeyKubeServiceName: serviceEndpoints.Name, MetaKeyKubeNS: serviceEndpoints.Namespace, + MetaKeyManagedBy: managedByValue, } for k, v := range pod.Annotations { if strings.HasPrefix(k, annotationMeta) && strings.TrimPrefix(k, annotationMeta) != "" { @@ -269,21 +389,13 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service Address: pod.Status.PodIP, Meta: meta, Namespace: r.consulNamespace(pod.Namespace), - Check: &api.AgentServiceCheck{ - CheckID: getConsulHealthCheckID(pod, serviceID), - Name: "Kubernetes Health Check", - TTL: "100000h", - Status: healthStatus, - SuccessBeforePassing: 1, - FailuresBeforeCritical: 1, - }, } if len(tags) > 0 { service.Tags = tags } - proxyServiceName := fmt.Sprintf("%s-sidecar-proxy", serviceName) - proxyServiceID := fmt.Sprintf("%s-%s", pod.Name, proxyServiceName) + proxyServiceName := getProxyServiceName(pod, serviceEndpoints) + proxyServiceID := getProxyServiceID(pod, serviceEndpoints) proxyConfig := &api.AgentServiceConnectProxyConfig{ DestinationServiceName: serviceName, DestinationServiceID: serviceID, @@ -506,8 +618,8 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, // of services instances that have the provided k8sServiceName and k8sServiceNamespace in their metadata. func serviceInstancesForK8SServiceNameAndNamespace(k8sServiceName, k8sServiceNamespace string, client *api.Client) (map[string]*api.AgentService, error) { return client.Agent().ServicesWithFilter( - fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q`, - MetaKeyKubeServiceName, k8sServiceName, MetaKeyKubeNS, k8sServiceNamespace)) + fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q and Meta[%q] == %q`, + MetaKeyKubeServiceName, k8sServiceName, MetaKeyKubeNS, k8sServiceNamespace, MetaKeyManagedBy, managedByValue)) } // processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream diff --git a/connect-inject/endpoints_controller_ent_test.go b/connect-inject/endpoints_controller_ent_test.go index ee98c3f1a9..b0640b6f81 100644 --- a/connect-inject/endpoints_controller_ent_test.go +++ b/connect-inject/endpoints_controller_ent_test.go @@ -88,8 +88,8 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) { }{ consulSvcName: "service-created", k8sObjects: func() []runtime.Object { - pod1 := createPodWithNamespace("pod1", test.SourceKubeNS, "1.2.3.4", true) - pod2 := createPodWithNamespace("pod2", test.SourceKubeNS, "2.2.3.4", true) + pod1 := createPodWithNamespace("pod1", test.SourceKubeNS, "1.2.3.4", true, true) + pod2 := createPodWithNamespace("pod2", test.SourceKubeNS, "2.2.3.4", true, true) endpointWithTwoAddresses := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-created", @@ -129,7 +129,7 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) { ServiceID: "pod1-service-created", ServiceName: "service-created", ServiceAddress: "1.2.3.4", - ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS, MetaKeyManagedBy: managedByValue}, ServiceTags: []string{}, Namespace: test.ExpConsulNS, }, @@ -137,7 +137,7 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) { ServiceID: "pod2-service-created", ServiceName: "service-created", ServiceAddress: "2.2.3.4", - ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS, MetaKeyManagedBy: managedByValue}, ServiceTags: []string{}, Namespace: test.ExpConsulNS, }, @@ -153,7 +153,7 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod1-service-created", TransparentProxy: &api.TransparentProxyConfig{}, }, - ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS, MetaKeyManagedBy: managedByValue}, ServiceTags: []string{}, Namespace: test.ExpConsulNS, }, @@ -167,7 +167,7 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod2-service-created", TransparentProxy: &api.TransparentProxyConfig{}, }, - ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS, MetaKeyManagedBy: managedByValue}, ServiceTags: []string{}, Namespace: test.ExpConsulNS, }, @@ -200,7 +200,7 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) { // code gets the agent pods via the label component=client, and // makes requests against the agent API, it will actually hit the // test server we have on localhost. - fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false, false) fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} // Add the pods namespace. @@ -298,6 +298,8 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) { checks := checkInfo.Checks require.Contains(t, expectedChecks, checks[0].Name) require.Contains(t, expectedChecks, checks[1].Name) + require.Equal(t, test.ExpConsulNS, checks[0].Namespace) + require.Equal(t, test.ExpConsulNS, checks[1].Namespace) } // Check that the Consul health check was created for the k8s pod. @@ -388,12 +390,91 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { expectedNumSvcInstances int expectedConsulSvcInstances []*api.CatalogService expectedProxySvcInstances []*api.CatalogService + expectedAgentHealthChecks []*api.AgentCheck }{ + { + name: "Legacy service: Health check is added to the correct namespace", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true, false) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: ts.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + TransparentProxy: &api.TransparentProxyConfig{}, + }, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: fmt.Sprintf("%s/pod1-service-updated/kubernetes-health-check", ts.SourceKubeNS), + ServiceName: "service-updated", + ServiceID: "pod1-service-updated", + Name: "Kubernetes Health Check", + Status: api.HealthPassing, + Output: kubernetesSuccessReasonMsg, + Type: ttl, + Namespace: ts.ExpConsulNS, + }, + }, + }, { name: "Endpoints has an updated address (pod IP change).", consulSvcName: "service-updated", k8sObjects: func() []runtime.Object { - pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "4.4.4.4", true) + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "4.4.4.4", true, true) endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -423,6 +504,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { Name: "service-updated", Port: 80, Address: "1.2.3.4", + Meta: map[string]string{MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -459,7 +541,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { name: "Different Consul service name: Endpoints has an updated address (pod IP change).", consulSvcName: "different-consul-svc-name", k8sObjects: func() []runtime.Object { - pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "4.4.4.4", true) + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "4.4.4.4", true, true) pod1.Annotations[annotationService] = "different-consul-svc-name" endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -490,6 +572,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { Name: "different-consul-svc-name", Port: 80, Address: "1.2.3.4", + Meta: map[string]string{MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -526,8 +609,8 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { name: "Endpoints has additional address not in Consul.", consulSvcName: "service-updated", k8sObjects: func() []runtime.Object { - pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true) - pod2 := createPodWithNamespace("pod2", ts.SourceKubeNS, "2.2.3.4", true) + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true, true) + pod2 := createPodWithNamespace("pod2", ts.SourceKubeNS, "2.2.3.4", true, true) endpointWithTwoAddresses := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -566,6 +649,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { Name: "service-updated", Port: 80, Address: "1.2.3.4", + Meta: map[string]string{MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -612,7 +696,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { name: "Consul has instances that are not in the Endpoints addresses.", consulSvcName: "service-updated", k8sObjects: func() []runtime.Object { - pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true) + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true, true) endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -642,7 +726,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { Name: "service-updated", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -656,7 +740,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod1-service-updated", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -664,7 +748,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { Name: "service-updated", Port: 80, Address: "2.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -678,7 +762,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod2-service-updated", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, }, @@ -702,7 +786,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { name: "Different Consul service name: Consul has instances that are not in the Endpoints addresses.", consulSvcName: "different-consul-svc-name", k8sObjects: func() []runtime.Object { - pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true) + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true, true) pod1.Annotations[annotationService] = "different-consul-svc-name" endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -733,7 +817,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { Name: "different-consul-svc-name", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -747,7 +831,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod1-different-consul-svc-name", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -755,7 +839,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { Name: "different-consul-svc-name", Port: 80, Address: "2.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -769,7 +853,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod2-different-consul-svc-name", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, }, @@ -809,7 +893,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { Name: "service-updated", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -823,7 +907,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod1-service-updated", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -831,7 +915,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { Name: "service-updated", Port: 80, Address: "2.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -845,7 +929,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod2-service-updated", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, }, @@ -873,7 +957,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { Name: "different-consul-svc-name", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -887,7 +971,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod1-different-consul-svc-name", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -895,7 +979,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { Name: "different-consul-svc-name", Port: 80, Address: "2.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -909,7 +993,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod2-different-consul-svc-name", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, }, @@ -925,7 +1009,7 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { // code gets the agent pods via the label component=client, and // makes requests against the agent API, it will actually hit the // test server we have on localhost. - fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false, true) fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} // Add the pods namespace. @@ -1022,6 +1106,23 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID) require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress) } + + // Check that the Consul health check was created for the k8s pod. + if tt.expectedAgentHealthChecks != nil { + for i := range tt.expectedConsulSvcInstances { + filter := fmt.Sprintf("CheckID == `%s`", tt.expectedAgentHealthChecks[i].CheckID) + newChecks, _ := consulClient.Agent().Checks() + for key, value := range newChecks { + fmt.Printf("%s:%v\n", key, value) + } + check, err := consulClient.Agent().ChecksWithFilter(filter) + require.NoError(t, err) + require.EqualValues(t, 1, len(check)) + // Ignoring Namespace because the response from ENT includes it and OSS does not. + var ignoredFields = []string{"Node", "Definition", "Namespace"} + require.True(t, cmp.Equal(check[tt.expectedAgentHealthChecks[i].CheckID], tt.expectedAgentHealthChecks[i], cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) + } + } }) } } @@ -1092,7 +1193,7 @@ func TestReconcileDeleteEndpointWithNamespaces(t *testing.T) { Name: "service-deleted", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -1106,7 +1207,7 @@ func TestReconcileDeleteEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod1-service-deleted", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, }, @@ -1120,7 +1221,7 @@ func TestReconcileDeleteEndpointWithNamespaces(t *testing.T) { Name: "different-consul-svc-name", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, { @@ -1134,7 +1235,7 @@ func TestReconcileDeleteEndpointWithNamespaces(t *testing.T) { DestinationServiceID: "pod1-different-consul-svc-name", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS, MetaKeyManagedBy: managedByValue}, Namespace: ts.ExpConsulNS, }, }, @@ -1146,7 +1247,7 @@ func TestReconcileDeleteEndpointWithNamespaces(t *testing.T) { // code gets the agent pods via the label component=client, and // makes requests against the agent API, it will actually hit the // test server we have on localhost. - fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false, true) fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} // Create fake k8s client. @@ -1220,7 +1321,7 @@ func TestReconcileDeleteEndpointWithNamespaces(t *testing.T) { } } -func createPodWithNamespace(name, namespace, ip string, inject bool) *corev1.Pod { +func createPodWithNamespace(name, namespace, ip string, inject bool, managedByEndpointsController bool) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -1244,6 +1345,9 @@ func createPodWithNamespace(name, namespace, ip string, inject bool) *corev1.Pod pod.Labels[keyInjectStatus] = injected pod.Annotations[keyInjectStatus] = injected } + if managedByEndpointsController { + pod.Labels[keyManagedBy] = managedByValue + } return pod } diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index c23bb369b3..6451145e1a 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -7,7 +7,7 @@ import ( "strings" "testing" - "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set" logrtest "github.com/go-logr/logr/testing" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -92,7 +92,7 @@ func TestHasBeenInjected(t *testing.T) { { name: "Pod with injected annotation", pod: func() corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) return *pod1 }, expected: true, @@ -100,7 +100,7 @@ func TestHasBeenInjected(t *testing.T) { { name: "Pod without injected annotation", pod: func() corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", false) + pod1 := createPod("pod1", "1.2.3.4", false, true) return *pod1 }, expected: false, @@ -166,7 +166,7 @@ func TestProcessUpstreamsTLSandACLs(t *testing.T) { DenyK8sNamespacesSet: mapset.NewSetWith(), } - pod := createPod("pod1", "1.2.3.4", true) + pod := createPod("pod1", "1.2.3.4", true, true) pod.Annotations[annotationUpstreams] = "upstream1:1234:dc1" upstreams, err := ep.processUpstreams(*pod) @@ -198,7 +198,7 @@ func TestProcessUpstreams(t *testing.T) { { name: "upstream with datacenter without ProxyDefaults", pod: func() *corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationUpstreams] = "upstream1:1234:dc1" return pod1 }, @@ -208,7 +208,7 @@ func TestProcessUpstreams(t *testing.T) { { name: "upstream with datacenter with ProxyDefaults whose mesh gateway mode is not local or remote", pod: func() *corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationUpstreams] = "upstream1:1234:dc1" return pod1 }, @@ -224,7 +224,7 @@ func TestProcessUpstreams(t *testing.T) { { name: "upstream with datacenter with ProxyDefaults and mesh gateway is in local mode", pod: func() *corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationUpstreams] = "upstream1:1234:dc1" return pod1 }, @@ -247,7 +247,7 @@ func TestProcessUpstreams(t *testing.T) { { name: "upstream with datacenter with ProxyDefaults and mesh gateway in remote mode", pod: func() *corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationUpstreams] = "upstream1:1234:dc1" return pod1 }, @@ -270,7 +270,7 @@ func TestProcessUpstreams(t *testing.T) { { name: "when consul is unavailable, we don't return an error", pod: func() *corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationUpstreams] = "upstream1:1234:dc1" return pod1 }, @@ -295,7 +295,7 @@ func TestProcessUpstreams(t *testing.T) { { name: "single upstream", pod: func() *corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationUpstreams] = "upstream:1234" return pod1 }, @@ -311,7 +311,7 @@ func TestProcessUpstreams(t *testing.T) { { name: "single upstream with namespace", pod: func() *corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationUpstreams] = "upstream.foo:1234" return pod1 }, @@ -328,7 +328,7 @@ func TestProcessUpstreams(t *testing.T) { { name: "multiple upstreams", pod: func() *corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationUpstreams] = "upstream1:1234, upstream2:2234" return pod1 }, @@ -349,7 +349,7 @@ func TestProcessUpstreams(t *testing.T) { { name: "multiple upstreams with consul namespaces and datacenters", pod: func() *corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationUpstreams] = "upstream1:1234, upstream2.bar:2234, upstream3.foo:3234:dc2" return pod1 }, @@ -383,7 +383,7 @@ func TestProcessUpstreams(t *testing.T) { { name: "prepared query upstream", pod: func() *corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationUpstreams] = "prepared_query:queryname:1234" return pod1 }, @@ -399,7 +399,7 @@ func TestProcessUpstreams(t *testing.T) { { name: "prepared query and non-query upstreams", pod: func() *corev1.Pod { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationUpstreams] = "prepared_query:queryname:1234, upstream1:2234, prepared_query:6687bd19-5654-76be-d764:8202" return pod1 }, @@ -513,7 +513,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { name: "Basic endpoints", consulSvcName: "service-created", k8sObjects: func() []runtime.Object { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-created", @@ -545,7 +545,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { ServiceName: "service-created", ServiceAddress: "1.2.3.4", ServicePort: 0, - ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default"}, + ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue}, ServiceTags: []string{}, }, }, @@ -562,7 +562,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { LocalServicePort: 0, TransparentProxy: &api.TransparentProxyConfig{}, }, - ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default"}, + ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue}, ServiceTags: []string{}, }, }, @@ -582,8 +582,8 @@ func TestReconcileCreateEndpoint(t *testing.T) { name: "Endpoints with multiple addresses", consulSvcName: "service-created", k8sObjects: func() []runtime.Object { - pod1 := createPod("pod1", "1.2.3.4", true) - pod2 := createPod("pod2", "2.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod2 := createPod("pod2", "2.2.3.4", true, true) endpointWithTwoAddresses := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-created", @@ -624,7 +624,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { ServiceName: "service-created", ServiceAddress: "1.2.3.4", ServicePort: 0, - ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default"}, + ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue}, ServiceTags: []string{}, }, { @@ -632,7 +632,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { ServiceName: "service-created", ServiceAddress: "2.2.3.4", ServicePort: 0, - ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default"}, + ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue}, ServiceTags: []string{}, }, }, @@ -649,7 +649,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { LocalServicePort: 0, TransparentProxy: &api.TransparentProxyConfig{}, }, - ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default"}, + ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue}, ServiceTags: []string{}, }, { @@ -664,7 +664,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { LocalServicePort: 0, TransparentProxy: &api.TransparentProxyConfig{}, }, - ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default"}, + ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", MetaKeyManagedBy: managedByValue}, ServiceTags: []string{}, }, }, @@ -693,7 +693,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { name: "Every configurable field set: port, different Consul service name, meta, tags, upstreams, metrics", consulSvcName: "different-consul-svc-name", k8sObjects: func() []runtime.Object { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationPort] = "1234" pod1.Annotations[annotationService] = "different-consul-svc-name" pod1.Annotations[fmt.Sprintf("%sname", annotationMeta)] = "abc" @@ -740,6 +740,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", + MetaKeyManagedBy: managedByValue, }, ServiceTags: []string{"abc", "123", "def", "456"}, }, @@ -773,6 +774,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: "default", + MetaKeyManagedBy: managedByValue, }, ServiceTags: []string{"abc", "123", "def", "456"}, }, @@ -796,7 +798,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { // code gets the agent pods via the label component=client, and // makes requests against the agent API, it will actually hit the // test server we have on localhost. - fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false, true) fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} // Add the default namespace. @@ -929,11 +931,321 @@ func TestReconcileUpdateEndpoint(t *testing.T) { expectedProxySvcInstances []*api.CatalogService expectedAgentHealthChecks []*api.AgentCheck }{ + // Legacy services are not managed by endpoints controller, but endpoints controller + // will still add/update the legacy service's health checks. + { + name: "Legacy service: Health check is added when the pod is healthy", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPod("pod1", "1.2.3.4", true, false) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: "default", + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + TransparentProxy: &api.TransparentProxyConfig{}, + }, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + }, + }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: "default/pod1-service-updated/kubernetes-health-check", + ServiceName: "service-updated", + ServiceID: "pod1-service-updated", + Name: "Kubernetes Health Check", + Status: api.HealthPassing, + Output: kubernetesSuccessReasonMsg, + Type: ttl, + }, + }, + }, + { + name: "Legacy service: Health check is added when the pod is unhealthy", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPod("pod1", "1.2.3.4", true, false) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + NotReadyAddresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: "default", + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + TransparentProxy: &api.TransparentProxyConfig{}, + }, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + }, + }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: "default/pod1-service-updated/kubernetes-health-check", + ServiceName: "service-updated", + ServiceID: "pod1-service-updated", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: "Pod \"default/pod1\" is not ready", + Type: ttl, + }, + }, + }, + { + name: "Legacy service: Service health check is updated when the pod goes from healthy --> unhealthy", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPod("pod1", "1.2.3.4", true, false) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + NotReadyAddresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: "default", + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Check: &api.AgentServiceCheck{ + CheckID: "default/pod1-service-updated/kubernetes-health-check", + Name: "Kubernetes Health Check", + TTL: "100000h", + Status: api.HealthPassing, + SuccessBeforePassing: 1, + FailuresBeforeCritical: 1, + }, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + TransparentProxy: &api.TransparentProxyConfig{}, + }, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + }, + }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: "default/pod1-service-updated/kubernetes-health-check", + ServiceName: "service-updated", + ServiceID: "pod1-service-updated", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: "Pod \"default/pod1\" is not ready", + Type: ttl, + }, + }, + }, + { + name: "Legacy service: Service health check is updated when the pod goes from unhealthy --> healthy", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPod("pod1", "1.2.3.4", true, false) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: "default", + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Check: &api.AgentServiceCheck{ + CheckID: "default/pod1-service-updated/kubernetes-health-check", + Name: "Kubernetes Health Check", + TTL: "100000h", + Status: api.HealthCritical, + SuccessBeforePassing: 1, + FailuresBeforeCritical: 1, + }, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + TransparentProxy: &api.TransparentProxyConfig{}, + }, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + }, + }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: "default/pod1-service-updated/kubernetes-health-check", + ServiceName: "service-updated", + ServiceID: "pod1-service-updated", + Name: "Kubernetes Health Check", + Status: api.HealthPassing, + Output: kubernetesSuccessReasonMsg, + Type: ttl, + }, + }, + }, { name: "Endpoints has an updated address because health check changes from unhealthy to healthy", consulSvcName: "service-updated", k8sObjects: func() []runtime.Object { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -1014,7 +1326,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { name: "Endpoints has an updated address because health check changes from healthy to unhealthy", consulSvcName: "service-updated", k8sObjects: func() []runtime.Object { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -1095,7 +1407,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { name: "Endpoints has an updated address (pod IP change).", consulSvcName: "service-updated", k8sObjects: func() []runtime.Object { - pod1 := createPod("pod1", "4.4.4.4", true) + pod1 := createPod("pod1", "4.4.4.4", true, true) endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -1157,7 +1469,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { name: "Different Consul service name: Endpoints has an updated address (pod IP change).", consulSvcName: "different-consul-svc-name", k8sObjects: func() []runtime.Object { - pod1 := createPod("pod1", "4.4.4.4", true) + pod1 := createPod("pod1", "4.4.4.4", true, true) pod1.Annotations[annotationService] = "different-consul-svc-name" endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -1188,6 +1500,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "different-consul-svc-name", Port: 80, Address: "1.2.3.4", + Meta: map[string]string{MetaKeyManagedBy: managedByValue}, }, { Kind: api.ServiceKindConnectProxy, @@ -1220,8 +1533,8 @@ func TestReconcileUpdateEndpoint(t *testing.T) { name: "Endpoints has additional address not in Consul.", consulSvcName: "service-updated", k8sObjects: func() []runtime.Object { - pod1 := createPod("pod1", "1.2.3.4", true) - pod2 := createPod("pod2", "2.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod2 := createPod("pod2", "2.2.3.4", true, true) endpointWithTwoAddresses := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -1260,6 +1573,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "service-updated", Port: 80, Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { Kind: api.ServiceKindConnectProxy, @@ -1320,7 +1634,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { name: "Consul has instances that are not in the Endpoints addresses.", consulSvcName: "service-updated", k8sObjects: func() []runtime.Object { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -1350,7 +1664,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "service-updated", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { Kind: api.ServiceKindConnectProxy, @@ -1363,14 +1677,14 @@ func TestReconcileUpdateEndpoint(t *testing.T) { DestinationServiceID: "pod1-service-updated", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { ID: "pod2-service-updated", Name: "service-updated", Port: 80, Address: "2.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { Kind: api.ServiceKindConnectProxy, @@ -1383,7 +1697,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { DestinationServiceID: "pod2-service-updated", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, }, expectedNumSvcInstances: 1, @@ -1404,7 +1718,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { name: "Different Consul service name: Consul has instances that are not in the Endpoints addresses.", consulSvcName: "different-consul-svc-name", k8sObjects: func() []runtime.Object { - pod1 := createPod("pod1", "1.2.3.4", true) + pod1 := createPod("pod1", "1.2.3.4", true, true) pod1.Annotations[annotationService] = "different-consul-svc-name" endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -1435,7 +1749,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "different-consul-svc-name", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { Kind: api.ServiceKindConnectProxy, @@ -1448,14 +1762,14 @@ func TestReconcileUpdateEndpoint(t *testing.T) { DestinationServiceID: "pod1-different-consul-svc-name", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { ID: "pod2-different-consul-svc-name", Name: "different-consul-svc-name", Port: 80, Address: "2.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { Kind: api.ServiceKindConnectProxy, @@ -1468,7 +1782,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { DestinationServiceID: "pod2-different-consul-svc-name", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, }, expectedNumSvcInstances: 1, @@ -1505,7 +1819,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "service-updated", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { Kind: api.ServiceKindConnectProxy, @@ -1518,14 +1832,14 @@ func TestReconcileUpdateEndpoint(t *testing.T) { DestinationServiceID: "pod1-service-updated", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { ID: "pod2-service-updated", Name: "service-updated", Port: 80, Address: "2.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { Kind: api.ServiceKindConnectProxy, @@ -1538,7 +1852,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { DestinationServiceID: "pod2-service-updated", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, }, expectedNumSvcInstances: 0, @@ -1565,7 +1879,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Name: "different-consul-svc-name", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { Kind: api.ServiceKindConnectProxy, @@ -1578,14 +1892,14 @@ func TestReconcileUpdateEndpoint(t *testing.T) { DestinationServiceID: "pod1-different-consul-svc-name", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { ID: "pod2-different-consul-svc-name", Name: "different-consul-svc-name", Port: 80, Address: "2.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { Kind: api.ServiceKindConnectProxy, @@ -1598,7 +1912,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { DestinationServiceID: "pod2-different-consul-svc-name", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, }, expectedNumSvcInstances: 0, @@ -1614,7 +1928,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { // code gets the agent pods via the label component=client, and // makes requests against the agent API, it will actually hit the // test server we have on localhost. - fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false, true) fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} // Add the default namespace. @@ -1729,11 +2043,13 @@ func TestReconcileDeleteEndpoint(t *testing.T) { cases := []struct { name string consulSvcName string + legacyService bool initialConsulSvcs []*api.AgentServiceRegistration }{ { - name: "Consul service name matches K8s service name", + name: "Legacy service: does not delete", consulSvcName: "service-deleted", + legacyService: true, initialConsulSvcs: []*api.AgentServiceRegistration{ { ID: "pod1-service-deleted", @@ -1757,6 +2073,32 @@ func TestReconcileDeleteEndpoint(t *testing.T) { }, }, }, + { + name: "Consul service name matches K8s service name", + consulSvcName: "service-deleted", + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-deleted", + Name: "service-deleted", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-deleted-sidecar-proxy", + Name: "service-deleted-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-deleted", + DestinationServiceID: "pod1-service-deleted", + TransparentProxy: &api.TransparentProxyConfig{}, + }, + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, + }, + }, + }, { name: "Consul service name does not match K8s service name", consulSvcName: "different-consul-svc-name", @@ -1766,7 +2108,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) { Name: "different-consul-svc-name", Port: 80, Address: "1.2.3.4", - Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, { Kind: api.ServiceKindConnectProxy, @@ -1779,7 +2121,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) { DestinationServiceID: "pod1-different-consul-svc-name", TransparentProxy: &api.TransparentProxyConfig{}, }, - Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": "default"}, + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": "default", MetaKeyManagedBy: managedByValue}, }, }, }, @@ -1790,7 +2132,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) { // code gets the agent pods via the label component=client, and // makes requests against the agent API, it will actually hit the // test server we have on localhost. - fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false, true) fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} // Add the default namespace. @@ -1847,6 +2189,12 @@ func TestReconcileDeleteEndpoint(t *testing.T) { // After reconciliation, Consul should not have any instances of service-deleted serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil) + // If it's not managed by endpoints controller (legacy service), Consul should have service instances + if tt.legacyService { + require.NoError(t, err) + require.NotEmpty(t, serviceInstances) + return + } require.NoError(t, err) require.Empty(t, serviceInstances) proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil) @@ -3012,7 +3360,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * for name, c := range cases { t.Run(name, func(t *testing.T) { - pod := createPod("test-pod-1", "1.2.3.4", true) + pod := createPod("test-pod-1", "1.2.3.4", true, true) if c.annotationEnabled != nil { pod.Annotations[keyTransparentProxy] = strconv.FormatBool(*c.annotationEnabled) } @@ -3083,7 +3431,7 @@ func TestEndpointsController_createServiceRegistrations_withTransparentProxy(t * } } -func createPod(name, ip string, inject bool) *corev1.Pod { +func createPod(name, ip string, inject bool, managedByEndpointsController bool) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -3100,6 +3448,9 @@ func createPod(name, ip string, inject bool) *corev1.Pod { pod.Labels[keyInjectStatus] = injected pod.Annotations[keyInjectStatus] = injected } + if managedByEndpointsController { + pod.Labels[keyManagedBy] = managedByValue + } return pod } diff --git a/connect-inject/handler.go b/connect-inject/handler.go index 7492449a84..9a58a9a6a0 100644 --- a/connect-inject/handler.go +++ b/connect-inject/handler.go @@ -255,6 +255,10 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R } pod.Labels[keyInjectStatus] = injected + // Add the managed-by label since services are now managed by endpoints controller. This is to support upgrading + // from consul-k8s without Endpoints controller to consul-k8s with Endpoints controller. + pod.Labels[keyManagedBy] = managedByValue + // Consul-ENT only: Add the Consul destination namespace as an annotation to the pod. if h.EnableNamespaces { pod.Annotations[annotationConsulNamespace] = h.consulNamespace(req.Namespace) diff --git a/connect-inject/handler_test.go b/connect-inject/handler_test.go index 3aeb80f9bf..058fc401c6 100644 --- a/connect-inject/handler_test.go +++ b/connect-inject/handler_test.go @@ -342,6 +342,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/labels/" + escapeJSONPointer(keyInjectStatus), }, + { + Operation: "add", + Path: "/metadata/labels/" + escapeJSONPointer(keyManagedBy), + }, }, }, @@ -412,6 +416,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/labels/" + escapeJSONPointer(keyInjectStatus), }, + { + Operation: "add", + Path: "/metadata/labels/" + escapeJSONPointer(keyManagedBy), + }, }, }, } diff --git a/go.sum b/go.sum index 1d2b2161f5..07d276069b 100644 --- a/go.sum +++ b/go.sum @@ -318,8 +318,6 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/api v1.4.1-0.20210504212756-347f3d212843 h1:KrwvodQtuOqcAscposKbpRBxNsjRvWb2EE28WBNcH7E= github.com/hashicorp/consul/api v1.4.1-0.20210504212756-347f3d212843/go.mod h1:sDjTOq0yUyv5G4h+BqSea7Fn6BU+XbolEz1952UB+mk= -github.com/hashicorp/consul/api v1.4.1-0.20210505201732-0a6d439dbb2c h1:CxKbZF3rL3HBU88k2IBZQe5wBWkPAxlI0zBPAWLOMfI= -github.com/hashicorp/consul/api v1.4.1-0.20210505201732-0a6d439dbb2c/go.mod h1:sDjTOq0yUyv5G4h+BqSea7Fn6BU+XbolEz1952UB+mk= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/consul/sdk v0.7.0 h1:H6R9d008jDcHPQPAqPNuydAshJ4v5/8URdFnUvK/+sc= github.com/hashicorp/consul/sdk v0.7.0/go.mod h1:fY08Y9z5SvJqevyZNy6WWPXiG3KwBPAvlcdx16zZ0fM=