diff --git a/.circleci/config.yml b/.circleci/config.yml index 66af22049c..5374eda76f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,8 +6,8 @@ executors: - image: docker.mirror.hashicorp.services/circleci/golang:1.14 environment: TEST_RESULTS: /tmp/test-results # path to where test results are saved - CONSUL_VERSION: 1.9.0-rc1 # Consul's OSS version to use in tests - CONSUL_ENT_VERSION: 1.9.0+ent-rc1 # Consul's enterprise version to use in tests + CONSUL_VERSION: 1.9.4 # Consul's OSS version to use in tests + CONSUL_ENT_VERSION: 1.9.4+ent # Consul's enterprise version to use in tests jobs: go-fmt-and-vet: diff --git a/connect-inject/cleanup_resource.go b/connect-inject/cleanup_resource.go index 9de5d2ac52..e555ce306e 100644 --- a/connect-inject/cleanup_resource.go +++ b/connect-inject/cleanup_resource.go @@ -107,7 +107,7 @@ func (c *CleanupResource) reconcile() { } podList, err := c.KubernetesClient.CoreV1().Pods(corev1.NamespaceAll).List(c.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) + metav1.ListOptions{LabelSelector: annotationStatus}) if err != nil { c.Log.Error("unable to get pods", "error", err) return @@ -223,11 +223,11 @@ func (c *CleanupResource) Informer() cache.SharedIndexInformer { &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).List(c.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) + metav1.ListOptions{LabelSelector: annotationStatus}) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).Watch(c.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) + metav1.ListOptions{LabelSelector: annotationStatus}) }, }, &corev1.Pod{}, diff --git a/connect-inject/cleanup_resource_ent_test.go b/connect-inject/cleanup_resource_ent_test.go index ac33f7ec51..7e3ae56452 100644 --- a/connect-inject/cleanup_resource_ent_test.go +++ b/connect-inject/cleanup_resource_ent_test.go @@ -292,7 +292,7 @@ var ( Name: "foo-abc123", Namespace: "default", Labels: map[string]string{ - labelInject: injected, + annotationStatus: injected, }, Annotations: map[string]string{ annotationStatus: injected, @@ -309,7 +309,7 @@ var ( Name: "foo-abc123", Namespace: "default", Labels: map[string]string{ - labelInject: injected, + annotationStatus: injected, }, Annotations: map[string]string{ annotationStatus: injected, diff --git a/connect-inject/cleanup_resource_test.go b/connect-inject/cleanup_resource_test.go index 5f15d16f3c..6ccc254172 100644 --- a/connect-inject/cleanup_resource_test.go +++ b/connect-inject/cleanup_resource_test.go @@ -324,7 +324,7 @@ var ( Name: "foo-abc123", Namespace: "default", Labels: map[string]string{ - labelInject: injected, + annotationStatus: injected, }, Annotations: map[string]string{ annotationStatus: injected, diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index 5615696d6f..4c1dd99650 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -5,7 +5,7 @@ import ( "fmt" "strings" - mapset "github.com/deckarep/golang-set" + "github.com/deckarep/golang-set" "github.com/go-logr/logr" "github.com/hashicorp/consul-k8s/consul" "github.com/hashicorp/consul/api" @@ -25,10 +25,12 @@ import ( ) const ( - MetaKeyPodName = "pod-name" - MetaKeyKubeServiceName = "k8s-service-name" - MetaKeyKubeNS = "k8s-namespace" - envoyPrometheusBindAddr = "envoy_prometheus_bind_addr" + MetaKeyPodName = "pod-name" + MetaKeyKubeServiceName = "k8s-service-name" + MetaKeyKubeNS = "k8s-namespace" + kubernetesSuccessReasonMsg = "Kubernetes health checks passing" + podPendingReasonMsg = "Pod is pending" + envoyPrometheusBindAddr = "envoy_prometheus_bind_addr" ) type EndpointsController struct { @@ -135,6 +137,18 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( r.Log.Error(err, "failed to register proxy service with Consul", "consul-proxy-service-name", proxyServiceRegistration.Name) return ctrl.Result{}, err } + + // Update the TTL health check for the service. + // This is required because ServiceRegister() does not update the TTL if the service already exists. + r.Log.Info("updating ttl health check", "service", serviceRegistration.Name) + status, reason, err := getReadyStatusAndReason(pod) + if err != nil { + return ctrl.Result{}, err + } + err = client.Agent().UpdateTTL(getConsulHealthCheckID(pod, serviceRegistration.ID), reason, status) + if err != nil { + return ctrl.Result{}, err + } } } } @@ -212,6 +226,14 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service tags = append(tags, strings.Split(raw, ",")...) } + // We do not set the Notes field with the 'reason' on creation because it does not set the Output field which + // gets read by Consul and you'll end up with both Notes and Output set. + // Notes (reason) will updated by UpdateTTL() as soon as this function returns. + status, _, err := getReadyStatusAndReason(pod) + if err != nil { + return nil, nil, err + } + service := &api.AgentServiceRegistration{ ID: serviceID, Name: serviceName, @@ -219,6 +241,14 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service Address: pod.Status.PodIP, Meta: meta, Namespace: "", // TODO: namespace support + Check: &api.AgentServiceCheck{ + CheckID: getConsulHealthCheckID(pod, serviceID), + Name: "Kubernetes Health Check", + TTL: "100000h", + Status: status, + SuccessBeforePassing: 1, + FailuresBeforeCritical: 1, + }, } if len(tags) > 0 { service.Tags = tags @@ -292,6 +322,39 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service return service, proxyService, nil } +// getConsulHealthCheckID deterministically generates a health check ID that will be unique to the Agent +// where the health check is registered and deregistered. +func getConsulHealthCheckID(pod corev1.Pod, serviceID string) string { + return fmt.Sprintf("%s/%s/kubernetes-health-check", pod.Namespace, serviceID) +} + +// getReadyStatusAndReason returns the formatted status string to pass to Consul based on the +// ready state of the pod along with the reason message which will be passed into the Notes +// field of the Consul health check. +func getReadyStatusAndReason(pod corev1.Pod) (string, string, error) { + // A pod might be pending if the init containers have run but the non-init + // containers haven't reached running state. In this case we set a failing health + // check so the pod doesn't receive traffic before it's ready. + if pod.Status.Phase == corev1.PodPending { + return api.HealthCritical, podPendingReasonMsg, nil + } + + for _, cond := range pod.Status.Conditions { + var consulStatus, reason string + if cond.Type == corev1.PodReady { + if cond.Status != corev1.ConditionTrue { + consulStatus = api.HealthCritical + reason = cond.Message + } else { + consulStatus = api.HealthPassing + reason = kubernetesSuccessReasonMsg + } + return consulStatus, reason, nil + } + } + return "", "", fmt.Errorf("no ready status for pod: %s", pod.Name) +} + // deregisterServiceOnAllAgents queries all agents for service instances that have the metadata // "k8s-service-name"=k8sSvcName and "k8s-namespace"=k8sSvcNamespace. The k8s service name may or may not match the // consul service name, but the k8s service name will always match the metadata on the Consul service diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index 1ab3d5fa2f..73c2aa02c1 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -8,6 +8,8 @@ import ( "github.com/deckarep/golang-set" logrtest "github.com/go-logr/logr/testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/consul-k8s/subcommand/common" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" @@ -21,6 +23,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" ) +const ( + testFailureMessage = "Kubernetes pod readiness probe failed" + ttl = "ttl" +) + func TestShouldIgnore(t *testing.T) { t.Parallel() cases := []struct { @@ -363,7 +370,7 @@ func TestProcessUpstreams(t *testing.T) { require.NoError(t, err) defer consul.Stop() - consul.WaitForLeader(t) + consul.WaitForSerfCheck(t) httpAddr := consul.HTTPAddr if tt.consulUnavailable { httpAddr = "hostname.does.not.exist:8500" @@ -414,6 +421,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { expectedNumSvcInstances int expectedConsulSvcInstances []*api.CatalogService expectedProxySvcInstances []*api.CatalogService + expectedAgentHealthChecks []*api.AgentCheck }{ { name: "Empty endpoints", @@ -436,6 +444,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { expectedNumSvcInstances: 0, expectedConsulSvcInstances: []*api.CatalogService{}, expectedProxySvcInstances: []*api.CatalogService{}, + expectedAgentHealthChecks: nil, }, { name: "Basic endpoints", @@ -493,6 +502,17 @@ func TestReconcileCreateEndpoint(t *testing.T) { ServiceTags: []string{}, }, }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: "default/pod1-service-created/kubernetes-health-check", + ServiceName: "service-created", + ServiceID: "pod1-service-created", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: testFailureMessage, + Type: ttl, + }, + }, }, { name: "Endpoints with multiple addresses", @@ -582,6 +602,26 @@ func TestReconcileCreateEndpoint(t *testing.T) { ServiceTags: []string{}, }, }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: "default/pod1-service-created/kubernetes-health-check", + ServiceName: "service-created", + ServiceID: "pod1-service-created", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: testFailureMessage, + Type: ttl, + }, + { + CheckID: "default/pod2-service-created/kubernetes-health-check", + ServiceName: "service-created", + ServiceID: "pod2-service-created", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: testFailureMessage, + Type: ttl, + }, + }, }, { name: "Every configurable field set: port, different Consul service name, meta, tags, upstreams, metrics", @@ -670,6 +710,17 @@ func TestReconcileCreateEndpoint(t *testing.T) { ServiceTags: []string{"abc", "123", "def", "456"}, }, }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: "default/pod1-different-consul-svc-name/kubernetes-health-check", + ServiceName: "different-consul-svc-name", + ServiceID: "pod1-different-consul-svc-name", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: testFailureMessage, + Type: ttl, + }, + }, }, } for _, tt := range cases { @@ -691,7 +742,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { }) require.NoError(t, err) defer consul.Stop() - consul.WaitForLeader(t) + consul.WaitForSerfCheck(t) cfg := &api.Config{ Address: consul.HTTPAddr, @@ -765,6 +816,19 @@ func TestReconcileCreateEndpoint(t *testing.T) { require.Contains(t, expectedChecks, checks[0].Name) require.Contains(t, expectedChecks, checks[1].Name) } + + // Check that the Consul health check was created for the k8s pod. + if tt.expectedAgentHealthChecks != nil { + for i, _ := range tt.expectedConsulSvcInstances { + filter := fmt.Sprintf("CheckID == `%s`", tt.expectedAgentHealthChecks[i].CheckID) + check, err := consulClient.Agent().ChecksWithFilter(filter) + require.NoError(t, err) + require.EqualValues(t, len(check), 1) + // Ignoring Namespace because the response from ENT includes it and OSS does not. + var ignoredFields = []string{"Node", "Definition", "Namespace"} + require.True(t, cmp.Equal(check[tt.expectedAgentHealthChecks[i].CheckID], tt.expectedAgentHealthChecks[i], cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) + } + } }) } } @@ -773,6 +837,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { // - Tests updates via the register codepath: // - When an address in an Endpoint is updated, that the corresponding service instance in Consul is updated. // - When an address is added to an Endpoint, an additional service instance in Consul is registered. +// - When an address in an Endpoint is updated - via health check change - the corresponding service instance is updated. // - Tests updates via the deregister codepath: // - When an address is removed from an Endpoint, the corresponding service instance in Consul is deregistered. // - When an address is removed from an Endpoint *and there are no addresses left in the Endpoint*, the @@ -792,7 +857,172 @@ func TestReconcileUpdateEndpoint(t *testing.T) { expectedNumSvcInstances int expectedConsulSvcInstances []*api.CatalogService expectedProxySvcInstances []*api.CatalogService + expectedAgentHealthChecks []*api.AgentCheck }{ + { + name: "Endpoints has an updated address because health check changes from unhealthy to healthy", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPod("pod1", "1.2.3.4", true) + pod1.Status.Conditions = []corev1.PodCondition{{ + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }} + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: "default", + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Check: &api.AgentServiceCheck{ + CheckID: "default/pod1-service-updated/kubernetes-health-check", + Name: "Kubernetes Health Check", + TTL: "100000h", + Status: "passing", + SuccessBeforePassing: 1, + FailuresBeforeCritical: 1, + }, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + }, + }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: "default/pod1-service-updated/kubernetes-health-check", + ServiceName: "service-updated", + ServiceID: "pod1-service-updated", + Name: "Kubernetes Health Check", + Status: api.HealthPassing, + Output: kubernetesSuccessReasonMsg, + Type: ttl, + }, + }, + }, + { + name: "Endpoints has an updated address because health check changes from healthy to unhealthy", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPod("pod1", "1.2.3.4", true) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: "default", + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Check: &api.AgentServiceCheck{ + CheckID: "default/pod1-service-updated/kubernetes-health-check", + Name: "Kubernetes Health Check", + TTL: "100000h", + Status: "passing", + SuccessBeforePassing: 1, + FailuresBeforeCritical: 1, + }, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + }, + }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: "default/pod1-service-updated/kubernetes-health-check", + ServiceName: "service-updated", + ServiceID: "pod1-service-updated", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: testFailureMessage, + Type: ttl, + }, + }, + }, { name: "Endpoints has an updated address (pod IP change).", consulSvcName: "service-updated", @@ -994,6 +1224,26 @@ func TestReconcileUpdateEndpoint(t *testing.T) { ServiceAddress: "2.2.3.4", }, }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: "default/pod1-service-updated/kubernetes-health-check", + ServiceName: "service-updated", + ServiceID: "pod1-service-updated", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: testFailureMessage, + Type: ttl, + }, + { + CheckID: "default/pod2-service-updated/kubernetes-health-check", + ServiceName: "service-updated", + ServiceID: "pod2-service-updated", + Name: "Kubernetes Health Check", + Status: api.HealthCritical, + Output: testFailureMessage, + Type: ttl, + }, + }, }, { name: "Consul has instances that are not in the Endpoints addresses.", @@ -1277,6 +1527,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { expectedProxySvcInstances: []*api.CatalogService{}, }, } + // Each test is run with ACLs+TLS (secure) enabled and disabled. for _, secure := range []bool{true, false} { for _, tt := range cases { t.Run(fmt.Sprintf("%s - secure: %v", tt.name, secure), func(t *testing.T) { @@ -1308,12 +1559,15 @@ func TestReconcileUpdateEndpoint(t *testing.T) { require.NoError(t, err) defer consul.Stop() consul.WaitForSerfCheck(t) + addr := strings.Split(consul.HTTPAddr, ":") + consulPort := addr[1] cfg := &api.Config{ Scheme: "http", Address: consul.HTTPAddr, } if secure { + consulPort = strings.Split(consul.HTTPSAddr, ":")[1] cfg.Address = consul.HTTPSAddr cfg.Scheme = "https" cfg.TLSConfig = api.TLSConfig{ @@ -1323,8 +1577,6 @@ func TestReconcileUpdateEndpoint(t *testing.T) { } consulClient, err := api.NewClient(cfg) require.NoError(t, err) - addr := strings.Split(cfg.Address, ":") - consulPort := addr[1] // Register service and proxy in consul for _, svc := range tt.initialConsulSvcs { @@ -1371,6 +1623,18 @@ func TestReconcileUpdateEndpoint(t *testing.T) { require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID) require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress) } + // Check that the Consul health check was created for the k8s pod. + if tt.expectedAgentHealthChecks != nil { + for i, _ := range tt.expectedConsulSvcInstances { + filter := fmt.Sprintf("CheckID == `%s`", tt.expectedAgentHealthChecks[i].CheckID) + check, err := consulClient.Agent().ChecksWithFilter(filter) + require.NoError(t, err) + require.EqualValues(t, len(check), 1) + // Ignoring Namespace because the response from ENT includes it and OSS does not. + var ignoredFields = []string{"Node", "Definition", "Namespace"} + require.True(t, cmp.Equal(check[tt.expectedAgentHealthChecks[i].CheckID], tt.expectedAgentHealthChecks[i], cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) + } + } }) } } @@ -1456,7 +1720,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) { require.NoError(t, err) defer consul.Stop() - consul.WaitForLeader(t) + consul.WaitForSerfCheck(t) cfg := &api.Config{ Address: consul.HTTPAddr, } @@ -2196,7 +2460,7 @@ func TestServiceInstancesForK8SServiceNameAndNamespace(t *testing.T) { require.NoError(t, err) defer consul.Stop() - consul.WaitForLeader(t) + consul.WaitForSerfCheck(t) consulClient, err := api.NewClient(&api.Config{ Address: consul.HTTPAddr, }) @@ -2230,14 +2494,19 @@ func createPod(name, ip string, inject bool) *corev1.Pod { Status: corev1.PodStatus{ PodIP: ip, HostIP: "127.0.0.1", + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{{ + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + Message: testFailureMessage, + }}, }, } if inject { - pod.Labels[labelInject] = injected + pod.Labels[annotationStatus] = injected pod.Annotations[annotationStatus] = injected } return pod - } func toStringPtr(input string) *string { diff --git a/connect-inject/handler.go b/connect-inject/handler.go index 1f917a0339..f82bf5fdae 100644 --- a/connect-inject/handler.go +++ b/connect-inject/handler.go @@ -239,7 +239,7 @@ func (h *Handler) Handle(_ context.Context, req admission.Request) admission.Res if pod.Labels == nil { pod.Labels = make(map[string]string) } - pod.Labels[labelInject] = injected + pod.Labels[annotationStatus] = injected // Consul-ENT only: Add the Consul destination namespace as an annotation to the pod. if h.EnableNamespaces { diff --git a/connect-inject/handler_test.go b/connect-inject/handler_test.go index 98c73f7b91..8b104b11aa 100644 --- a/connect-inject/handler_test.go +++ b/connect-inject/handler_test.go @@ -334,7 +334,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/labels/" + escapeJSONPointer(labelInject), + Path: "/metadata/labels/" + escapeJSONPointer(annotationStatus), }, }, }, @@ -406,7 +406,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/labels/" + escapeJSONPointer(labelInject), + Path: "/metadata/labels/" + escapeJSONPointer(annotationStatus), }, }, }, diff --git a/connect-inject/health_check_resource.go b/connect-inject/health_check_resource.go deleted file mode 100644 index b908064c2f..0000000000 --- a/connect-inject/health_check_resource.go +++ /dev/null @@ -1,340 +0,0 @@ -package connectinject - -import ( - "errors" - "fmt" - "strings" - "sync" - "time" - - "github.com/hashicorp/consul-k8s/consul" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-hclog" - "golang.org/x/net/context" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -const ( - // labelInject is the label which is applied by the connect-inject webhook to all pods. - // This is the key the controller will use on the label filter for its lister, watcher and reconciler. - labelInject = "consul.hashicorp.com/connect-inject-status" - - // kubernetesSuccessReasonMsg will be passed for passing health check's Reason to Consul. - kubernetesSuccessReasonMsg = "Kubernetes health checks passing" - - podPendingReasonMsg = "Pod is pending" -) - -// ServiceNotFoundErr is returned when a Consul service instance is not registered. -var ServiceNotFoundErr = errors.New("service is not registered in Consul") - -type HealthCheckResource struct { - Log hclog.Logger - KubernetesClientset kubernetes.Interface - - // ConsulScheme is the scheme to use when making API calls to Consul, - // i.e. "http" or "https". - ConsulScheme string - // ConsulPort is the port to make HTTP API calls to Consul agents on. - ConsulPort string - // ReconcilePeriod is the period by which reconcile gets called. - // default to 1 minute. - ReconcilePeriod time.Duration - - Ctx context.Context - lock sync.Mutex -} - -// Run is the long-running runloop for periodically running Reconcile. -// It initially reconciles at startup and is then invoked after every -// ReconcilePeriod expires. -func (h *HealthCheckResource) Run(stopCh <-chan struct{}) { - reconcileTimer := time.NewTimer(h.ReconcilePeriod) - defer reconcileTimer.Stop() - - for { - h.reconcile() - reconcileTimer.Reset(h.ReconcilePeriod) - - select { - case <-stopCh: - h.Log.Info("received stop signal, shutting down") - return - - case <-reconcileTimer.C: - // Fall through and continue the loop. - } - } -} - -// Delete is not implemented because it is handled by the preStop phase whereby all services -// related to the pod are deregistered which also deregisters health checks. -func (h *HealthCheckResource) Delete(string, interface{}) error { - return nil -} - -// Informer starts a sharedindex informer which watches and lists corev1.Pod objects -// which meet the filter of labelInject. -func (h *HealthCheckResource) Informer() cache.SharedIndexInformer { - return cache.NewSharedIndexInformer( - // ListWatch takes a List and Watch function which we filter based on label which was injected. - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return h.KubernetesClientset.CoreV1().Pods(metav1.NamespaceAll).List(h.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return h.KubernetesClientset.CoreV1().Pods(metav1.NamespaceAll).Watch(h.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) - }, - }, - &corev1.Pod{}, // the target type (Pod) - 0, // no resync (period of 0) - cache.Indexers{}, - ) -} - -// Upsert processes a create or update event. -// Two primary use cases are handled, new pods will get a new consul TTL health check -// registered against their respective agent and service, and updates to pods will have -// this TTL health check updated to reflect the pod's readiness status. -func (h *HealthCheckResource) Upsert(_ string, raw interface{}) error { - pod, ok := raw.(*corev1.Pod) - if !ok { - return fmt.Errorf("failed to cast to a pod object") - } - err := h.reconcilePod(pod) - if err != nil { - h.Log.Error("unable to update pod", "err", err) - return err - } - return nil -} - -// reconcile iterates through all Pods with the appropriate label and compares the -// current health check status against that which is stored in Consul and updates -// the consul health check accordingly. If the health check doesn't yet exist it will create it. -func (h *HealthCheckResource) reconcile() { - h.lock.Lock() - defer h.lock.Unlock() - h.Log.Debug("starting reconcile") - // First grab the list of Pods which have the label labelInject. - podList, err := h.KubernetesClientset.CoreV1().Pods(corev1.NamespaceAll).List(h.Ctx, - metav1.ListOptions{LabelSelector: labelInject}) - if err != nil { - h.Log.Error("unable to get pods", "err", err) - return - } - // Reconcile the state of each pod in the podList. - for _, pod := range podList.Items { - err = h.reconcilePod(&pod) - if err != nil { - h.Log.Error("unable to update pod", "err", err) - } - } - h.Log.Debug("finished reconcile") -} - -// reconcilePod will reconcile a pod. This is the common work for both Upsert and Reconcile. -func (h *HealthCheckResource) reconcilePod(pod *corev1.Pod) error { - h.Log.Debug("processing pod", "name", pod.Name) - if !h.shouldProcess(pod) { - // Skip pods that are not running or have not been properly injected. - return nil - } - // Fetch the identifiers we will use to interact with the Consul agent for this pod. - serviceID := h.getConsulServiceID(pod) - healthCheckID := h.getConsulHealthCheckID(pod) - status, reason, err := h.getReadyStatusAndReason(pod) - if err != nil { - return fmt.Errorf("unable to get pod status: %s", err) - } - // Get a client connection to the correct agent. - client, err := h.getConsulClient(pod) - if err != nil { - return fmt.Errorf("unable to get Consul client connection for %s: %s", pod.Name, err) - } - // Retrieve the health check that would exist if the service had one registered for this pod. - serviceCheck, err := h.getServiceCheck(client, healthCheckID) - if err != nil { - return fmt.Errorf("unable to get agent health checks: serviceID=%s, checkID=%s, %s", serviceID, healthCheckID, err) - } - if serviceCheck == nil { - // Create a new health check. - h.Log.Debug("registering new health check", "name", pod.Name, "id", healthCheckID) - err = h.registerConsulHealthCheck(client, healthCheckID, serviceID, status) - if errors.Is(err, ServiceNotFoundErr) { - h.Log.Warn("skipping registration because service not registered with Consul - this may be because the pod is shutting down", "serviceID", serviceID) - return nil - } else if err != nil { - return fmt.Errorf("unable to register health check: %s", err) - } - h.Log.Debug("updating health check status", "name", pod.Name, "status", status, "reason", reason) - // Also update it, the reason this is separate is there is no way to set the Output field of the health check - // at creation time, and this is what is displayed on the UI as opposed to the Notes field. - err = h.updateConsulHealthCheckStatus(client, healthCheckID, status, reason) - if err != nil { - return fmt.Errorf("error updating health check: %s", err) - } - } else if serviceCheck.Status != status { - // Update the healthCheck. - h.Log.Debug("updating health check status", "name", pod.Name, "status", status, "reason", reason) - err = h.updateConsulHealthCheckStatus(client, healthCheckID, status, reason) - if err != nil { - return fmt.Errorf("error updating health check: %s", err) - } - } - return nil -} - -// updateConsulHealthCheckStatus updates the consul health check status. -func (h *HealthCheckResource) updateConsulHealthCheckStatus(client *api.Client, consulHealthCheckID, status, reason string) error { - h.Log.Debug("updating health check", "id", consulHealthCheckID) - return client.Agent().UpdateTTL(consulHealthCheckID, reason, status) -} - -// registerConsulHealthCheck registers a TTL health check for the service on this Agent. -// The Agent is local to the Pod which has a kubernetes health check. -// This has the effect of marking the service instance healthy/unhealthy for Consul service mesh traffic. -func (h *HealthCheckResource) registerConsulHealthCheck(client *api.Client, consulHealthCheckID, serviceID, status string) error { - h.Log.Debug("registering Consul health check", "id", consulHealthCheckID, "serviceID", serviceID) - - // Create a TTL health check in Consul associated with this service and pod. - // The TTL time is 100000h which should ensure that the check never fails due to timeout - // of the TTL check. - err := client.Agent().CheckRegister(&api.AgentCheckRegistration{ - ID: consulHealthCheckID, - Name: "Kubernetes Health Check", - ServiceID: serviceID, - AgentServiceCheck: api.AgentServiceCheck{ - TTL: "100000h", - Status: status, - SuccessBeforePassing: 1, - FailuresBeforeCritical: 1, - }, - }) - if err != nil { - // Full error looks like: - // Unexpected response code: 500 (ServiceID "consulnamespace/svc-id" does not exist) - if strings.Contains(err.Error(), fmt.Sprintf("%s\" does not exist", serviceID)) { - return ServiceNotFoundErr - } - return fmt.Errorf("registering health check for service %q: %s", serviceID, err) - } - return nil -} - -// getServiceCheck will return the health check for this pod and service if it exists. -func (h *HealthCheckResource) getServiceCheck(client *api.Client, healthCheckID string) (*api.AgentCheck, error) { - filter := fmt.Sprintf("CheckID == `%s`", healthCheckID) - checks, err := client.Agent().ChecksWithFilter(filter) - if err != nil { - return nil, fmt.Errorf("getting check %q: %s", healthCheckID, err) - } - // This will be nil (does not exist) or an actual check. - return checks[healthCheckID], nil -} - -// getReadyStatusAndReason returns the formatted status string to pass to Consul based on the -// ready state of the pod along with the reason message which will be passed into the Notes -// field of the Consul health check. -func (h *HealthCheckResource) getReadyStatusAndReason(pod *corev1.Pod) (string, string, error) { - // A pod might be pending if the init containers have run but the non-init - // containers haven't reached running state. In this case we set a failing health - // check so the pod doesn't receive traffic before it's ready. - if pod.Status.Phase == corev1.PodPending { - return api.HealthCritical, podPendingReasonMsg, nil - } - - for _, cond := range pod.Status.Conditions { - var consulStatus, reason string - if cond.Type == corev1.PodReady { - if cond.Status != corev1.ConditionTrue { - consulStatus = api.HealthCritical - reason = cond.Message - } else { - consulStatus = api.HealthPassing - reason = kubernetesSuccessReasonMsg - } - return consulStatus, reason, nil - } - } - return "", "", fmt.Errorf("no ready status for pod: %s", pod.Name) -} - -// getConsulClient returns an *api.Client that points at the consul agent local to the pod. -func (h *HealthCheckResource) getConsulClient(pod *corev1.Pod) (*api.Client, error) { - newAddr := fmt.Sprintf("%s://%s:%s", h.ConsulScheme, pod.Status.HostIP, h.ConsulPort) - localConfig := api.DefaultConfig() - localConfig.Address = newAddr - if pod.Annotations[annotationConsulNamespace] != "" { - localConfig.Namespace = pod.Annotations[annotationConsulNamespace] - } - localClient, err := consul.NewClient(localConfig) - if err != nil { - h.Log.Error("unable to get Consul API Client", "addr", newAddr, "err", err) - return nil, err - } - h.Log.Debug("setting consul client to the following agent", "addr", newAddr) - return localClient, err -} - -// shouldProcess is a simple filter which determines if Upsert or Reconcile should attempt to process the pod. -// This is done without making any client api calls so it is fast. -func (h *HealthCheckResource) shouldProcess(pod *corev1.Pod) bool { - if pod.Annotations[annotationStatus] != injected { - return false - } - - // If the pod has been terminated, we don't want to try and modify its - // health check status because the preStop hook will have deregistered - // this pod and so we'll get errors making API calls to set the status - // of a check for a service that doesn't exist. - // We detect a terminated pod by looking to see if all the containers - // have their state set as "terminated". Kubernetes will only send - // an update to this reconciler when all containers have stopped so if - // the conditions below are satisfied we're guaranteed that the preStop - // hook has run. - if pod.Status.Phase == corev1.PodRunning && len(pod.Status.ContainerStatuses) > 0 { - allTerminated := true - for _, c := range pod.Status.ContainerStatuses { - if c.State.Terminated == nil { - allTerminated = false - break - } - } - if allTerminated { - return false - } - // Otherwise we fall through to checking if the service has been - // registered yet. - } - - // We process any pod that has had its injection init container completed because - // this means the service instance has been registered with Consul and so we can - // and should set its health check status. If we don't set the health check - // immediately after registration, the pod will start to receive traffic, - // even if its non-init containers haven't yet reached the running state. - for _, c := range pod.Status.InitContainerStatuses { - if c.Name == InjectInitContainerName { - return c.State.Terminated != nil && c.State.Terminated.Reason == "Completed" - } - } - return false -} - -// getConsulHealthCheckID deterministically generates a health check ID that will be unique to the Agent -// where the health check is registered and deregistered. -func (h *HealthCheckResource) getConsulHealthCheckID(pod *corev1.Pod) string { - return fmt.Sprintf("%s/%s/kubernetes-health-check", pod.Namespace, h.getConsulServiceID(pod)) -} - -// getConsulServiceID returns the serviceID of the connect service. -func (h *HealthCheckResource) getConsulServiceID(pod *corev1.Pod) string { - return fmt.Sprintf("%s-%s", pod.Name, pod.Annotations[annotationService]) -} diff --git a/connect-inject/health_check_resource_ent_test.go b/connect-inject/health_check_resource_ent_test.go deleted file mode 100644 index 65a5e5e645..0000000000 --- a/connect-inject/health_check_resource_ent_test.go +++ /dev/null @@ -1,288 +0,0 @@ -// +build enterprise - -package connectinject - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/hashicorp/consul/api" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -const ( - testNamespace = "testnamespace" - testNamespacedHealthCheckID = "testnamespace/test-pod-test-service/kubernetes-health-check" - - testAlternateNamespace = "testalternatenamespace" - testAlternateNamespacedHealthCheckID = "testalternatenamespace/test-pod-test-service/kubernetes-health-check" -) - -var ignoredFieldsEnterprise = []string{"Node", "Definition", "ServiceID", "ServiceName"} - -var testPodWithNamespace = corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, - Name: testPodName, - }, - Spec: corev1.PodSpec{}, -} - -// Test that when consulNamespaces are enabled, the health check is registered in the right namespace. -func TestReconcilePodWithNamespace(t *testing.T) { - t.Parallel() - cases := []struct { - Name string - PreCreateHealthCheck bool - InitialState string - Pod *corev1.Pod - Expected *api.AgentCheck - }{ - { - Name: "reconcilePod will create check and set passed", - PreCreateHealthCheck: false, - InitialState: "", // only used when precreating a health check - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testNamespacedHealthCheckID, - Status: api.HealthPassing, - Notes: "", - Output: kubernetesSuccessReasonMsg, - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - { - Name: "reconcilePod will create check and set failed", - PreCreateHealthCheck: false, - InitialState: "", // only used when precreating a health check - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - Message: testFailureMessage, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testNamespacedHealthCheckID, - Status: api.HealthCritical, - Notes: "", - Output: testFailureMessage, - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - { - Name: "precreate a passing pod and change to failed", - PreCreateHealthCheck: true, - InitialState: api.HealthPassing, - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - Message: testFailureMessage, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testNamespacedHealthCheckID, - Status: api.HealthCritical, - Output: testFailureMessage, - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - { - Name: "precreate failed pod and change to passing", - PreCreateHealthCheck: true, - InitialState: api.HealthCritical, - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testNamespacedHealthCheckID, - Status: api.HealthPassing, - Output: testCheckNotesPassing, - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - { - Name: "precreate failed check, no pod changes results in no health check changes", - PreCreateHealthCheck: true, - InitialState: api.HealthCritical, - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testNamespacedHealthCheckID, - Status: api.HealthCritical, - Output: "", // when there is no change in status, Consul doesnt set the Output field - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - { - Name: "precreate failed pod and change to passing, k8s/consul namespaces different", - PreCreateHealthCheck: true, - InitialState: api.HealthCritical, - Pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: testAlternateNamespace, - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - annotationConsulNamespace: testNamespace, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - Expected: &api.AgentCheck{ - CheckID: testAlternateNamespacedHealthCheckID, - Status: api.HealthPassing, - Output: testCheckNotesPassing, - Type: ttl, - Name: name, - Namespace: testNamespace, - }, - }, - } - for _, tt := range cases { - t.Run(tt.Name, func(t *testing.T) { - require := require.New(t) - // Get a server, client, and handler. - server, client, resource := testServerAgentResourceAndControllerWithConsulNS(t, tt.Pod, testNamespace) - defer server.Stop() - // Create the namespace in Consul. - _, _, err := client.Namespaces().Create(&api.Namespace{Name: testNamespace}, nil) - require.NoError(err) - - // Register the service with Consul. - err = client.Agent().ServiceRegister(&api.AgentServiceRegistration{ - ID: testServiceNameReg, - Name: testServiceNameAnnotation, - Namespace: testNamespace, - }) - require.NoError(err) - if tt.PreCreateHealthCheck { - // Register the health check if this is not an object create path. - registerHealthCheck(t, client, tt.InitialState) - } - // Upsert and Reconcile both use reconcilePod to reconcile a pod. - err = resource.reconcilePod(tt.Pod) - require.NoError(err) - // Get the agent checks if they were registered. - actual := getConsulAgentChecks(t, client, tt.Expected.CheckID) - require.True(cmp.Equal(actual, tt.Expected, cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFieldsEnterprise...))) - }) - } -} diff --git a/connect-inject/health_check_resource_test.go b/connect-inject/health_check_resource_test.go deleted file mode 100644 index 97cdd41aba..0000000000 --- a/connect-inject/health_check_resource_test.go +++ /dev/null @@ -1,813 +0,0 @@ -package connectinject - -import ( - "context" - "fmt" - "net/url" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/freeport" - "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/sdk/testutil/retry" - "github.com/hashicorp/go-hclog" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" -) - -const ( - testPodName = "test-pod" - testServiceNameAnnotation = "test-service" - testServiceNameReg = "test-pod-test-service" - testHealthCheckID = "default/test-pod-test-service/kubernetes-health-check" - testFailureMessage = "Kubernetes pod readiness probe failed" - testCheckNotesPassing = "Kubernetes health checks passing" - ttl = "ttl" - name = "Kubernetes Health Check" -) - -// Used by gocmp. -var ignoredFields = []string{"Node", "Namespace", "Definition", "ServiceID", "ServiceName"} - -var testPodSpec = corev1.PodSpec{ - Containers: []corev1.Container{ - corev1.Container{ - Name: testPodName, - }, - }, -} - -var completedInjectInitContainer = []corev1.ContainerStatus{ - { - Name: InjectInitContainerName, - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - Reason: "Completed", - }, - }, - Ready: true, - }, -} - -func registerHealthCheck(t *testing.T, client *api.Client, initialState string) { - require := require.New(t) - err := client.Agent().CheckRegister(&api.AgentCheckRegistration{ - Name: "Kubernetes Health Check", - ID: testHealthCheckID, - ServiceID: testServiceNameReg, - Notes: "", - AgentServiceCheck: api.AgentServiceCheck{ - TTL: "100000h", - Status: initialState, - Notes: "", - }, - }) - require.NoError(err) -} - -// We expect to already be pointed at the correct agent. -func getConsulAgentChecks(t *testing.T, client *api.Client, healthCheckID string) *api.AgentCheck { - require := require.New(t) - filter := fmt.Sprintf("CheckID == `%s`", healthCheckID) - checks, err := client.Agent().ChecksWithFilter(filter) - require.NoError(err) - return checks[healthCheckID] -} - -func TestReconcilePod(t *testing.T) { - t.Parallel() - cases := []struct { - Name string - PreCreateHealthCheck bool - InitialState string - Pod *corev1.Pod - Expected *api.AgentCheck - Err string - }{ - { - "inject init container has completed but containers not yet running", - false, - api.HealthPassing, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - Phase: corev1.PodPending, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthCritical, - Notes: "", - Output: "Pod is pending", - Type: ttl, - Name: name, - }, - "", - }, - { - "reconcilePod will create check and set passed", - false, - api.HealthPassing, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthPassing, - Notes: "", - Output: kubernetesSuccessReasonMsg, - Type: ttl, - Name: name, - }, - "", - }, - { - "reconcilePod will create check and set failed", - false, - api.HealthPassing, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - Message: testFailureMessage, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthCritical, - Notes: "", - Output: testFailureMessage, - Type: ttl, - Name: name, - }, - "", - }, - { - "precreate a passing pod and change to failed", - true, - api.HealthPassing, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - Message: testFailureMessage, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthCritical, - Output: testFailureMessage, - Type: ttl, - Name: name, - }, - "", - }, - { - "precreate failed pod and change to passing", - true, - api.HealthCritical, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthPassing, - Output: testCheckNotesPassing, - Type: ttl, - Name: name, - }, - "", - }, - { - "precreate failed check, no pod changes results in no healthcheck changes", - true, - api.HealthCritical, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }}, - }, - }, - &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthCritical, - Output: "", // when there is no change in status, Consul doesnt set the Output field - Type: ttl, - Name: name, - }, - "", - }, - { - "PodRunning no annotations will be ignored for processing", - false, - "", - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - }, - nil, - "", - }, - { - "PodRunning no Ready Status will be ignored for processing", - false, - "", - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - }, - }, - nil, - "", - }, - } - for _, tt := range cases { - t.Run(tt.Name, func(t *testing.T) { - var err error - require := require.New(t) - // Get a server, client, and handler. - server, client, resource := testServerAgentResourceAndController(t, tt.Pod) - defer server.Stop() - server.WaitForLeader(t) - // Register the service with Consul. - server.AddService(t, testServiceNameReg, api.HealthPassing, nil) - if tt.PreCreateHealthCheck { - // Register the health check if this is not an object create path. - registerHealthCheck(t, client, tt.InitialState) - } - // Upsert and Reconcile both use reconcilePod to reconcile a pod. - err = resource.reconcilePod(tt.Pod) - // If we're expecting any error from reconcilePod. - if tt.Err != "" { - // used in the cases where we're expecting an error from - // the controller/handler, in which case do not check agent - // checks as they're not relevant/created. - require.Error(err, tt.Err) - return - } - require.NoError(err) - // Get the agent checks if they were registered. - actual := getConsulAgentChecks(t, client, testHealthCheckID) - - cmpOpts := cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...) - require.True(cmp.Equal(actual, tt.Expected, cmpOpts), - cmp.Diff(actual, tt.Expected, cmpOpts)) - require.True(cmp.Equal(actual, tt.Expected, cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) - }) - } -} - -// Test that when we call upsert and the service hasn't been registered -// in Consul yet, we don't return an error. -func TestUpsert_PodWithNoService(t *testing.T) { - t.Parallel() - require := require.New(t) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - } - server, _, resource := testServerAgentResourceAndController(t, pod) - defer server.Stop() - err := resource.Upsert("", pod) - require.Nil(err) -} - -func TestReconcile_IgnorePodsWithoutInjectLabel(t *testing.T) { - t.Parallel() - require := require.New(t) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }}, - }, - } - server, client, resource := testServerAgentResourceAndController(t, pod) - defer server.Stop() - // Start the reconciler, it should not create a health check. - resource.reconcile() - actual := getConsulAgentChecks(t, client, testHealthCheckID) - require.Nil(actual) -} - -// Test pod statuses that the reconciler should ignore. -// These test cases are based on actual observed startup and termination phases. -func TestReconcile_IgnoreStatuses(t *testing.T) { - t.Parallel() - cases := map[string]corev1.PodStatus{ - "not scheduled": { - Phase: corev1.PodPending, - }, - "scheduled and pending": { - Phase: corev1.PodPending, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodScheduled, - Status: corev1.ConditionTrue, - }, - }, - }, - "inject init container initializing": { - Phase: corev1.PodPending, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodScheduled, - Status: corev1.ConditionTrue, - }, - { - Type: corev1.PodInitialized, - Status: corev1.ConditionFalse, - }, - { - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }, - { - Type: corev1.ContainersReady, - Status: corev1.ConditionFalse, - }, - }, - InitContainerStatuses: []corev1.ContainerStatus{ - { - Name: InjectInitContainerName, - State: corev1.ContainerState{ - Waiting: &corev1.ContainerStateWaiting{ - Reason: "Initializing", - }, - }, - Ready: false, - }, - }, - ContainerStatuses: unreadyAppContainers, - }, - "inject init container running (but not terminated)": { - Phase: corev1.PodPending, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodScheduled, - Status: corev1.ConditionTrue, - }, - { - Type: corev1.PodInitialized, - Status: corev1.ConditionFalse, - }, - { - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }, - { - Type: corev1.ContainersReady, - Status: corev1.ConditionFalse, - }, - }, - InitContainerStatuses: []corev1.ContainerStatus{ - { - Name: InjectInitContainerName, - State: corev1.ContainerState{ - Waiting: nil, - Running: &corev1.ContainerStateRunning{StartedAt: metav1.Now()}, - }, - Ready: false, - }, - }, - ContainerStatuses: unreadyAppContainers, - }, - "pod is terminating": { - Phase: corev1.PodRunning, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodScheduled, - Status: corev1.ConditionTrue, - }, - { - Type: corev1.PodInitialized, - Status: corev1.ConditionTrue, - }, - { - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - }, - { - Type: corev1.ContainersReady, - Status: corev1.ConditionFalse, - }, - }, - InitContainerStatuses: []corev1.ContainerStatus{ - { - Name: InjectInitContainerName, - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{StartedAt: metav1.Now()}, - }, - Ready: true, - }, - }, - ContainerStatuses: []corev1.ContainerStatus{ - { - Name: "envoy-sidecar", - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - ExitCode: 0, - Reason: "Completed", - StartedAt: metav1.Time{}, - FinishedAt: metav1.Time{}, - }, - }, - Ready: false, - }, - { - Name: "consul-sidecar", - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - ExitCode: 2, - Reason: "Error", - StartedAt: metav1.Time{}, - FinishedAt: metav1.Time{}, - }, - }, - Ready: false, - }, - { - Name: "app", - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - ExitCode: 137, - Reason: "Error", - StartedAt: metav1.Time{}, - FinishedAt: metav1.Time{}, - }, - }, - Ready: false, - }, - }, - }, - } - for name, podStatus := range cases { - t.Run(name, func(t *testing.T) { - require := require.New(t) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: podStatus, - } - server, _, resource := testServerAgentResourceAndController(t, pod) - defer server.Stop() - - // We would expect an error if the reconciler actually tried to - // register a health check because the underlying service hasn't - // been created. - require.NoError(resource.reconcilePod(pod)) - }) - } -} - -// Test that stopch works for Reconciler. -func TestReconcilerShutdown(t *testing.T) { - t.Parallel() - require := require.New(t) - k8sclientset := fake.NewSimpleClientset() - healthResource := HealthCheckResource{ - Log: hclog.Default().Named("healthCheckResource"), - KubernetesClientset: k8sclientset, - ReconcilePeriod: 5 * time.Second, - } - - reconcilerRunningCtx := make(chan struct{}) - reconcilerShutdownSuccess := make(chan bool) - go func() { - // Starting the reconciler. - healthResource.Run(reconcilerRunningCtx) - close(reconcilerShutdownSuccess) - }() - // Trigger shutdown of the reconciler. - close(reconcilerRunningCtx) - - select { - case <-reconcilerShutdownSuccess: - // The function is expected to exit gracefully so no assertion needed. - return - case <-time.After(time.Second * 1): - // Fail if the stopCh was not caught. - require.Fail("timeout waiting for reconciler to shutdown") - } -} - -// Test that if the agent is unavailable reconcile will fail on the pod -// and once the agent becomes available reconcile will correctly -// update the checks after its loop timer passes. -func TestReconcileRun(t *testing.T) { - t.Parallel() - var err error - require := require.New(t) - - // Start the clientset with a Pod that is failed. - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: testPodName, - Namespace: "default", - Labels: map[string]string{labelInject: "true"}, - Annotations: map[string]string{ - annotationStatus: injected, - annotationService: testServiceNameAnnotation, - }, - }, - Spec: testPodSpec, - Status: corev1.PodStatus{ - HostIP: "127.0.0.1", - Phase: corev1.PodRunning, - InitContainerStatuses: completedInjectInitContainer, - Conditions: []corev1.PodCondition{{ - Type: corev1.PodReady, - Status: corev1.ConditionFalse, - Message: testFailureMessage, - }}, - }, - } - k8sclientset := fake.NewSimpleClientset(pod) - randomPorts := freeport.MustTake(6) - schema := "http://" - serverAddress := fmt.Sprintf("%s%s:%d", schema, "127.0.0.1", randomPorts[1]) - - // Setup consul client connection. - clientConfig := &api.Config{Address: serverAddress} - require.NoError(err) - client, err := api.NewClient(clientConfig) - require.NoError(err) - consulURL, err := url.Parse(serverAddress) - require.NoError(err) - - healthResource := HealthCheckResource{ - Log: hclog.Default().Named("healthCheckResource"), - KubernetesClientset: k8sclientset, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - ReconcilePeriod: 100 * time.Millisecond, - } - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - - // Start the reconciler. - go func() { - healthResource.Run(ctx.Done()) - }() - // Let reconcile run at least once. - time.Sleep(time.Millisecond * 300) - - var srv *testutil.TestServer - srv, err = testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { - c.Ports = &testutil.TestPortConfig{ - DNS: randomPorts[0], - HTTP: randomPorts[1], - HTTPS: randomPorts[2], - SerfLan: randomPorts[3], - SerfWan: randomPorts[4], - Server: randomPorts[5], - } - }) - require.NoError(err) - // Validate that there is no health check created by reconciler. - check := getConsulAgentChecks(t, client, testHealthCheckID) - require.Nil(check) - // Add the service - only now will a health check have a service to register against. - srv.AddService(t, testServiceNameReg, api.HealthPassing, nil) - - // Retry so we can cover time period when reconciler is already running vs - // when it will run next based on the loop. - timer := &retry.Timer{Timeout: 5 * time.Second, Wait: 1 * time.Second} - var actual *api.AgentCheck - retry.RunWith(timer, t, func(r *retry.R) { - actual = getConsulAgentChecks(t, client, testHealthCheckID) - // The assertion is not on actual != nil, but below - // against an expected check. - if actual == nil || actual.Output == "" { - r.Error("check = nil") - } - }) - - expectedCheck := &api.AgentCheck{ - CheckID: testHealthCheckID, - Status: api.HealthCritical, - Output: testFailureMessage, - Type: ttl, - Name: name, - } - // Validate the checks are set. - require.True(cmp.Equal(actual, expectedCheck, cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) -} - -func testServerAgentResourceAndController(t *testing.T, pod *corev1.Pod) (*testutil.TestServer, *api.Client, *HealthCheckResource) { - return testServerAgentResourceAndControllerWithConsulNS(t, pod, "") -} - -func testServerAgentResourceAndControllerWithConsulNS(t *testing.T, pod *corev1.Pod, consulNS string) (*testutil.TestServer, *api.Client, *HealthCheckResource) { - require := require.New(t) - // Setup server & client. - s, err := testutil.NewTestServerConfigT(t, nil) - require.NoError(err) - - clientConfig := &api.Config{Address: s.HTTPAddr, Namespace: consulNS} - client, err := api.NewClient(clientConfig) - require.NoError(err) - - schema := "http://" - consulURL, err := url.Parse(schema + s.HTTPAddr) - require.NoError(err) - - healthResource := HealthCheckResource{ - Log: hclog.Default().Named("healthCheckResource"), - KubernetesClientset: fake.NewSimpleClientset(pod), - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - ReconcilePeriod: 0, - } - return s, client, &healthResource -} - -// unreadyAppContainers are the container statuses of an example connect pod's -// non-init containers when init containers are still running. -var unreadyAppContainers = []corev1.ContainerStatus{ - { - Name: "envoy-sidecar", - State: corev1.ContainerState{ - Waiting: &corev1.ContainerStateWaiting{ - Reason: "PodInitializing", - }, - }, - Ready: false, - }, - { - Name: "consul-sidecar", - State: corev1.ContainerState{ - Waiting: &corev1.ContainerStateWaiting{ - Reason: "PodInitializing", - }, - }, - Ready: false, - }, - { - Name: "app", - State: corev1.ContainerState{ - Waiting: &corev1.ContainerStateWaiting{ - Reason: "PodInitializing", - }, - }, - Ready: false, - }, -} diff --git a/subcommand/inject-connect/command.go b/subcommand/inject-connect/command.go index 9610fce4ac..2ba64c8d77 100644 --- a/subcommand/inject-connect/command.go +++ b/subcommand/inject-connect/command.go @@ -64,10 +64,6 @@ type Command struct { flagK8SNSMirroringPrefix string // Prefix added to Consul namespaces created when mirroring flagCrossNamespaceACLPolicy string // The name of the ACL policy to add to every created namespace if ACLs are enabled - // Flags to enable connect-inject health checks. - flagEnableHealthChecks bool // Start the health check controller. - flagHealthChecksReconcilePeriod time.Duration // Period for health check reconcile. - // Flags for cleanup controller. flagEnableCleanupController bool // Start the cleanup controller. flagCleanupControllerReconcilePeriod time.Duration // Period for cleanup controller reconcile. @@ -150,9 +146,6 @@ func (c *Command) init() { "K8s namespaces to explicitly allow. May be specified multiple times.") c.flagSet.Var((*flags.AppendSliceValue)(&c.flagDenyK8sNamespacesList), "deny-k8s-namespace", "K8s namespaces to explicitly deny. Takes precedence over allow. May be specified multiple times.") - c.flagSet.BoolVar(&c.flagEnableHealthChecks, "enable-health-checks-controller", false, - "Enables health checks controller.") - c.flagSet.DurationVar(&c.flagHealthChecksReconcilePeriod, "health-checks-reconcile-period", 1*time.Minute, "Reconcile period for health checks controller.") c.flagSet.BoolVar(&c.flagEnableCleanupController, "enable-cleanup-controller", true, "Enables cleanup controller that cleans up stale Consul service instances.") c.flagSet.DurationVar(&c.flagCleanupControllerReconcilePeriod, "cleanup-controller-reconcile-period", 5*time.Minute, "Reconcile period for cleanup controller.") @@ -462,20 +455,11 @@ func (c *Command) Run(args []string) int { Log: logger.Named("handler"), }}) - // todo: Add tests in case it's not refactored to not have any signal handling - // (In the future, we plan to only have the manager and rely on it to do signal handling for us). - go func() { - // Pass existing context's done channel so that the controller - // will stop when this context is canceled. - // This could be due to an interrupt signal or if any other component did not start - // successfully. In those cases, we want to make sure that this controller is no longer - // running. - if err := mgr.Start(ctx); err != nil { - setupLog.Error(err, "problem running manager") - // Use an existing channel for ctrl exists in case manager failed to start properly. - ctrlExitCh <- fmt.Errorf("endpoints controller exited unexpectedly") - } - }() + if err := mgr.Start(ctx); err != nil { + setupLog.Error(err, "problem running manager") + // Use an existing channel for ctrl exists in case manager failed to start properly. + ctrlExitCh <- fmt.Errorf("endpoints controller exited unexpectedly") + } } if c.flagEnableCleanupController { @@ -501,34 +485,6 @@ func (c *Command) Run(args []string) int { }() } - if c.flagEnableHealthChecks { - healthResource := connectinject.HealthCheckResource{ - Log: logger.Named("healthCheckResource"), - KubernetesClientset: c.clientset, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - Ctx: ctx, - ReconcilePeriod: c.flagHealthChecksReconcilePeriod, - } - - healthChecksCtrl := &controller.Controller{ - Log: logger.Named("healthCheckController"), - Resource: &healthResource, - } - - // Start the health check controller, reconcile is started at the same time - // and new events will queue in the informer. - go func() { - healthChecksCtrl.Run(ctx.Done()) - // If ctl.Run() exits before ctx is cancelled, then our health checks - // controller isn't running. In that case we need to shutdown since - // this is unrecoverable. - if ctx.Err() == nil { - ctrlExitCh <- fmt.Errorf("health checks controller exited unexpectedly") - } - }() - } - // Block until we get a signal or something errors. select { case sig := <-c.sigCh: diff --git a/subcommand/inject-connect/command_test.go b/subcommand/inject-connect/command_test.go index dc9c0a0e3c..c775a713d2 100644 --- a/subcommand/inject-connect/command_test.go +++ b/subcommand/inject-connect/command_test.go @@ -155,14 +155,12 @@ func TestRun_FlagValidation(t *testing.T) { }, { flags: []string{"-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", - "-enable-health-checks-controller=true", "-http-addr=http://0.0.0.0:9999", "-listen", "999999"}, expErr: "missing port in address: 999999", }, { flags: []string{"-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", - "-enable-health-checks-controller=true", "-http-addr=http://0.0.0.0:9999", "-listen", ":foobar"}, expErr: "unable to parse port string: strconv.Atoi: parsing \"foobar\": invalid syntax", @@ -208,8 +206,7 @@ func TestRun_ValidationConsulHTTPAddr(t *testing.T) { UI: ui, clientset: k8sClient, } - flags := []string{"-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", - "-enable-health-checks-controller=true"} + flags := []string{"-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0"} os.Setenv(api.HTTPAddrEnvName, "%") code := cmd.Run(flags) @@ -219,10 +216,10 @@ func TestRun_ValidationConsulHTTPAddr(t *testing.T) { require.Contains(t, ui.ErrorWriter.String(), "error parsing consul address \"http://%\": parse \"http://%\": invalid URL escape \"%") } -// Test that when healthchecks are enabled that SIGINT/SIGTERM exits the +// Test that when cleanup controller is enabled that SIGINT/SIGTERM exits the // command cleanly. func TestRun_CommandExitsCleanlyAfterSignal(t *testing.T) { - // TODO: fix this skip + // TODO: This test will be removed when the cleanupController is removed. t.Skip("This test will be rewritten when the manager handles all signal handling") t.Run("SIGINT", testSignalHandling(syscall.SIGINT)) t.Run("SIGTERM", testSignalHandling(syscall.SIGTERM)) @@ -245,7 +242,6 @@ func testSignalHandling(sig os.Signal) func(*testing.T) { // Start the command asynchronously and then we'll send an interrupt. exitChan := runCommandAsynchronously(&cmd, []string{ "-consul-k8s-image", "hashicorp/consul-k8s", "-consul-image", "foo", "-envoy-image", "envoy:1.16.0", - "-enable-health-checks-controller=true", "-listen", fmt.Sprintf(":%d", ports[0]), }) diff --git a/subcommand/server-acl-init/command.go b/subcommand/server-acl-init/command.go index d389a26d4f..373c2bb29d 100644 --- a/subcommand/server-acl-init/command.go +++ b/subcommand/server-acl-init/command.go @@ -80,9 +80,6 @@ type Command struct { // Flag to support a custom bootstrap token flagBootstrapTokenFile string - // Flag to indicate that the health checks controller is enabled. - flagEnableHealthChecks bool - flagEnableCleanupController bool flagLogLevel string @@ -133,7 +130,7 @@ func (c *Command) init() { c.flags.BoolVar(&c.flagCreateInjectToken, "create-inject-auth-method", false, "Toggle for creating a connect inject auth method. Deprecated: use -create-inject-token instead.") c.flags.BoolVar(&c.flagCreateInjectToken, "create-inject-token", false, - "Toggle for creating a connect inject auth method and an ACL token. The ACL token will only be created if either of the -enable-namespaces or -enable-health-checks flags is set.") + "Toggle for creating a connect inject auth method and an ACL token.") c.flags.StringVar(&c.flagInjectAuthMethodHost, "inject-auth-method-host", "", "Kubernetes Host config parameter for the auth method."+ "If not provided, the default cluster Kubernetes service will be used.") @@ -197,9 +194,6 @@ func (c *Command) init() { "Path to file containing ACL token for creating policies and tokens. This token must have 'acl:write' permissions."+ "When provided, servers will not be bootstrapped and their policies and tokens will not be updated.") - c.flags.BoolVar(&c.flagEnableHealthChecks, "enable-health-checks", false, - "Toggle for adding ACL rules for the health check controller to the connect ACL token. Requires -create-inject-token to be also be set.") - c.flags.BoolVar(&c.flagEnableCleanupController, "enable-cleanup-controller", true, "Toggle for adding ACL rules for the cleanup controller to the connect ACL token. Requires -create-inject-token to be also be set.") @@ -484,27 +478,24 @@ func (c *Command) Run(args []string) int { return 1 } - // If health checks or namespaces are enabled, - // then the connect injector needs an ACL token. - if c.flagEnableNamespaces || c.flagEnableHealthChecks || c.flagEnableCleanupController { - injectRules, err := c.injectRules() - if err != nil { - c.log.Error("Error templating inject rules", "err", err) - return 1 - } + // The endpoints controller needs an ACL token always. + injectRules, err := c.injectRules() + if err != nil { + c.log.Error("Error templating inject rules", "err", err) + return 1 + } - // If namespaces are enabled, the policy and token need to be global - // to be allowed to create namespaces. - if c.flagEnableNamespaces { - err = c.createGlobalACL("connect-inject", injectRules, consulDC, consulClient) - } else { - err = c.createLocalACL("connect-inject", injectRules, consulDC, consulClient) - } + // If namespaces are enabled, the policy and token need to be global + // to be allowed to create namespaces. + if c.flagEnableNamespaces { + err = c.createGlobalACL("connect-inject", injectRules, consulDC, consulClient) + } else { + err = c.createLocalACL("connect-inject", injectRules, consulDC, consulClient) + } - if err != nil { - c.log.Error(err.Error()) - return 1 - } + if err != nil { + c.log.Error(err.Error()) + return 1 } } diff --git a/subcommand/server-acl-init/command_ent_test.go b/subcommand/server-acl-init/command_ent_test.go index c01d1ba2bd..57270644b6 100644 --- a/subcommand/server-acl-init/command_ent_test.go +++ b/subcommand/server-acl-init/command_ent_test.go @@ -679,8 +679,8 @@ func TestRun_TokensWithNamespacesEnabled(t *testing.T) { SecretNames: []string{resourcePrefix + "-connect-inject-acl-token"}, LocalToken: false, }, - "inject token with health checks and namespaces": { - TokenFlags: []string{"-create-inject-token", "-enable-namespaces", "-enable-health-checks"}, + "inject token and namespaces": { + TokenFlags: []string{"-create-inject-token", "-enable-namespaces"}, PolicyNames: []string{"connect-inject-token"}, PolicyDCs: nil, SecretNames: []string{resourcePrefix + "-connect-inject-acl-token"}, diff --git a/subcommand/server-acl-init/command_test.go b/subcommand/server-acl-init/command_test.go index f06c7d547a..464b268aaf 100644 --- a/subcommand/server-acl-init/command_test.go +++ b/subcommand/server-acl-init/command_test.go @@ -238,8 +238,8 @@ func TestRun_TokensPrimaryDC(t *testing.T) { LocalToken: false, }, { - TestName: "Health Checks ACL token", - TokenFlags: []string{"-create-inject-token", "-enable-health-checks"}, + TestName: "Endpoints Controller ACL token", + TokenFlags: []string{"-create-inject-token"}, PolicyNames: []string{"connect-inject-token"}, PolicyDCs: []string{"dc1"}, SecretNames: []string{resourcePrefix + "-connect-inject-acl-token"}, @@ -401,8 +401,8 @@ func TestRun_TokensReplicatedDC(t *testing.T) { LocalToken: true, }, { - TestName: "Health Checks ACL token", - TokenFlags: []string{"-create-inject-token", "-enable-health-checks"}, + TestName: "Endpoints controller ACL token", + TokenFlags: []string{"-create-inject-token"}, PolicyNames: []string{"connect-inject-token-dc2"}, PolicyDCs: []string{"dc2"}, SecretNames: []string{resourcePrefix + "-connect-inject-acl-token"}, @@ -493,6 +493,12 @@ func TestRun_TokensWithProvidedBootstrapToken(t *testing.T) { PolicyNames: []string{"client-token"}, SecretNames: []string{resourcePrefix + "-client-acl-token"}, }, + { + TestName: "Endpoints controller ACL token", + TokenFlags: []string{"-create-inject-token"}, + PolicyNames: []string{"connect-inject-token"}, + SecretNames: []string{resourcePrefix + "-connect-inject-acl-token"}, + }, { TestName: "Sync token", TokenFlags: []string{"-create-sync-token"}, @@ -547,12 +553,6 @@ func TestRun_TokensWithProvidedBootstrapToken(t *testing.T) { PolicyNames: []string{"acl-replication-token"}, SecretNames: []string{resourcePrefix + "-acl-replication-acl-token"}, }, - { - TestName: "Health Checks ACL token", - TokenFlags: []string{"-create-inject-token", "-enable-health-checks"}, - PolicyNames: []string{"connect-inject-token"}, - SecretNames: []string{resourcePrefix + "-connect-inject-acl-token"}, - }, { TestName: "Cleanup controller ACL token", TokenFlags: []string{"-create-inject-token", "-enable-cleanup-controller"}, diff --git a/subcommand/server-acl-init/rules.go b/subcommand/server-acl-init/rules.go index 4ea36f6a59..07e6594a35 100644 --- a/subcommand/server-acl-init/rules.go +++ b/subcommand/server-acl-init/rules.go @@ -15,7 +15,6 @@ type rulesData struct { InjectEnableNSMirroring bool InjectNSMirroringPrefix string SyncConsulNodeName string - EnableHealthChecks bool EnableCleanupController bool } @@ -207,14 +206,13 @@ namespace "{{ .SyncConsulDestNS }}" { func (c *Command) injectRules() (string, error) { // The Connect injector needs permissions to create namespaces when namespaces are enabled. - // If health checks are enabled it must also create/update service checks. + // It must also create/update service health checks via the endpoints controller. // If the cleanup controller is enabled, it must be able to delete service // instances from every client. injectRulesTpl := ` {{- if .EnableNamespaces }} operator = "write" {{- end }} -{{- if (or .EnableHealthChecks .EnableCleanupController) }} node_prefix "" { policy = "write" } @@ -226,7 +224,6 @@ namespace_prefix "" { } {{- if .EnableNamespaces }} } -{{- end }} {{- end }}` return c.renderRules(injectRulesTpl) } @@ -293,7 +290,6 @@ func (c *Command) rulesData() rulesData { InjectEnableNSMirroring: c.flagEnableInjectK8SNSMirroring, InjectNSMirroringPrefix: c.flagInjectK8SNSMirroringPrefix, SyncConsulNodeName: c.flagSyncConsulNodeName, - EnableHealthChecks: c.flagEnableHealthChecks, EnableCleanupController: c.flagEnableCleanupController, } } diff --git a/subcommand/server-acl-init/rules_test.go b/subcommand/server-acl-init/rules_test.go index 434c324ff2..ddfa51cc9a 100644 --- a/subcommand/server-acl-init/rules_test.go +++ b/subcommand/server-acl-init/rules_test.go @@ -494,30 +494,15 @@ namespace_prefix "prefix-" { } } -// There are three true/false settings so 8 permutations to test. +// Test the inject rules through the 4 permutations of Namespaces/controller enabled or disabled. func TestInjectRules(t *testing.T) { cases := []struct { EnableNamespaces bool - EnableHealthChecks bool EnableCleanupController bool Expected string }{ { EnableNamespaces: false, - EnableHealthChecks: false, - EnableCleanupController: false, - Expected: "", - }, - { - EnableNamespaces: true, - EnableHealthChecks: false, - EnableCleanupController: false, - Expected: ` -operator = "write"`, - }, - { - EnableNamespaces: false, - EnableHealthChecks: true, EnableCleanupController: false, Expected: ` node_prefix "" { @@ -529,7 +514,6 @@ node_prefix "" { }, { EnableNamespaces: false, - EnableHealthChecks: false, EnableCleanupController: true, Expected: ` node_prefix "" { @@ -541,7 +525,6 @@ node_prefix "" { }, { EnableNamespaces: true, - EnableHealthChecks: true, EnableCleanupController: false, Expected: ` operator = "write" @@ -556,22 +539,6 @@ namespace_prefix "" { }, { EnableNamespaces: true, - EnableHealthChecks: false, - EnableCleanupController: true, - Expected: ` -operator = "write" -node_prefix "" { - policy = "write" -} -namespace_prefix "" { - service_prefix "" { - policy = "write" - } -}`, - }, - { - EnableNamespaces: true, - EnableHealthChecks: true, EnableCleanupController: true, Expected: ` operator = "write" @@ -584,29 +551,16 @@ namespace_prefix "" { } }`, }, - { - EnableNamespaces: false, - EnableHealthChecks: true, - EnableCleanupController: true, - Expected: ` -node_prefix "" { - policy = "write" -} - service_prefix "" { - policy = "write" - }`, - }, } for _, tt := range cases { - caseName := fmt.Sprintf("ns=%t health=%t cleanup=%t", - tt.EnableNamespaces, tt.EnableHealthChecks, tt.EnableCleanupController) + caseName := fmt.Sprintf("ns=%t cleanup=%t", + tt.EnableNamespaces, tt.EnableCleanupController) t.Run(caseName, func(t *testing.T) { require := require.New(t) cmd := Command{ flagEnableNamespaces: tt.EnableNamespaces, - flagEnableHealthChecks: tt.EnableHealthChecks, flagEnableCleanupController: tt.EnableCleanupController, }