diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ecbe9ed66..08aaf16bbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ IMPROVEMENTS: * Add a `global.extraLabels` stanza to allow setting global Kubernetes labels for all components deployed by the `consul-k8s` Helm chart. [[GH-1778](https://github.com/hashicorp/consul-k8s/pull/1778)] * Control-Plane * Add support for the annotation `consul.hashicorp.com/use-proxy-health-check`. [[GH-1824](https://github.com/hashicorp/consul-k8s/pull/1824)] + * Add health check for synced services based on the status of the Kubernetes readiness probe on synced pod. [[GH-1821](https://github.com/hashicorp/consul-k8s/pull/1821)] BUG FIXES: * Control Plane diff --git a/acceptance/tests/sync/sync_catalog_test.go b/acceptance/tests/sync/sync_catalog_test.go index 942843d53f..b43ef66099 100644 --- a/acceptance/tests/sync/sync_catalog_test.go +++ b/acceptance/tests/sync/sync_catalog_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul-k8s/acceptance/framework/helpers" "github.com/hashicorp/consul-k8s/acceptance/framework/k8s" "github.com/hashicorp/consul-k8s/acceptance/framework/logger" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/stretchr/testify/require" ) @@ -84,8 +85,13 @@ func TestSyncCatalog(t *testing.T) { service, _, err := consulClient.Catalog().Service(syncedServiceName, "", nil) require.NoError(t, err) - require.Equal(t, 1, len(service)) + require.Len(t, service, 1) require.Equal(t, []string{"k8s"}, service[0].ServiceTags) + filter := fmt.Sprintf("ServiceID == %q", service[0].ServiceID) + healthChecks, _, err := consulClient.Health().Checks(syncedServiceName, &api.QueryOptions{Filter: filter}) + require.NoError(t, err) + require.Len(t, healthChecks, 1) + require.Equal(t, api.HealthPassing, healthChecks[0].Status) }) } } diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index 239e2f5db9..a8ba6d20bd 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -13,7 +13,7 @@ import ( "github.com/hashicorp/consul-k8s/control-plane/namespaces" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" - apiv1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -33,6 +33,12 @@ const ( ConsulK8SRefKind = "external-k8s-ref-kind" ConsulK8SRefValue = "external-k8s-ref-name" ConsulK8SNodeName = "external-k8s-node-name" + + // consulKubernetesCheckType is the type of health check in Consul for Kubernetes readiness status. + consulKubernetesCheckType = "kubernetes-readiness" + // consulKubernetesCheckName is the name of health check in Consul for Kubernetes readiness status. + consulKubernetesCheckName = "Kubernetes Readiness Check" + kubernetesSuccessReasonMsg = "Kubernetes health checks passing" ) type NodePortSyncType string @@ -131,11 +137,11 @@ type ServiceResource struct { // serviceMap holds services we should sync to Consul. Keys are the // in the form /. - serviceMap map[string]*apiv1.Service + serviceMap map[string]*corev1.Service // endpointsMap uses the same keys as serviceMap but maps to the endpoints // of each service. - endpointsMap map[string]*apiv1.Endpoints + endpointsMap map[string]*corev1.Endpoints // consulMap holds the services in Consul that we've registered from kube. // It's populated via Consul's API and lets us diff what is actually in @@ -157,7 +163,7 @@ func (t *ServiceResource) Informer() cache.SharedIndexInformer { return t.Client.CoreV1().Services(metav1.NamespaceAll).Watch(t.Ctx, options) }, }, - &apiv1.Service{}, + &corev1.Service{}, 0, cache.Indexers{}, ) @@ -166,7 +172,7 @@ func (t *ServiceResource) Informer() cache.SharedIndexInformer { // Upsert implements the controller.Resource interface. func (t *ServiceResource) Upsert(key string, raw interface{}) error { // We expect a Service. If it isn't a service then just ignore it. - service, ok := raw.(*apiv1.Service) + service, ok := raw.(*corev1.Service) if !ok { t.Log.Warn("upsert got invalid type", "raw", raw) return nil @@ -176,7 +182,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { defer t.serviceLock.Unlock() if t.serviceMap == nil { - t.serviceMap = make(map[string]*apiv1.Service) + t.serviceMap = make(map[string]*corev1.Service) } if !t.shouldSync(service) { @@ -205,7 +211,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { "err", err) } else { if t.endpointsMap == nil { - t.endpointsMap = make(map[string]*apiv1.Endpoints) + t.endpointsMap = make(map[string]*corev1.Endpoints) } t.endpointsMap[key] = endpoints t.Log.Debug("[ServiceResource.Upsert] adding service's endpoints to endpointsMap", "key", key, "service", service, "endpoints", endpoints) @@ -254,7 +260,7 @@ func (t *ServiceResource) Run(ch <-chan struct{}) { } // shouldSync returns true if resyncing should be enabled for the given service. -func (t *ServiceResource) shouldSync(svc *apiv1.Service) bool { +func (t *ServiceResource) shouldSync(svc *corev1.Service) bool { // Namespace logic // If in deny list, don't sync if t.DenyK8sNamespacesSet.Contains(svc.Namespace) { @@ -269,7 +275,7 @@ func (t *ServiceResource) shouldSync(svc *apiv1.Service) bool { } // Ignore ClusterIP services if ClusterIP sync is disabled - if svc.Spec.Type == apiv1.ServiceTypeClusterIP && !t.ClusterIPSync { + if svc.Spec.Type == corev1.ServiceTypeClusterIP && !t.ClusterIPSync { t.Log.Debug("[shouldSync] ignoring clusterip service", "svc.Namespace", svc.Namespace, "service", svc) return false } @@ -310,9 +316,9 @@ func (t *ServiceResource) shouldTrackEndpoints(key string) bool { return false } - return svc.Spec.Type == apiv1.ServiceTypeNodePort || - svc.Spec.Type == apiv1.ServiceTypeClusterIP || - (t.LoadBalancerEndpointsSync && svc.Spec.Type == apiv1.ServiceTypeLoadBalancer) + return svc.Spec.Type == corev1.ServiceTypeNodePort || + svc.Spec.Type == corev1.ServiceTypeClusterIP || + (t.LoadBalancerEndpointsSync && svc.Spec.Type == corev1.ServiceTypeLoadBalancer) } // generateRegistrations generates the necessary Consul registrations for @@ -380,7 +386,7 @@ func (t *ServiceResource) generateRegistrations(key string) { var overridePortNumber int if len(svc.Spec.Ports) > 0 { var port int - isNodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort + isNodePort := svc.Spec.Type == corev1.ServiceTypeNodePort // If a specific port is specified, then use that port value portAnnotation, ok := svc.Annotations[annotationServicePort] @@ -479,7 +485,7 @@ func (t *ServiceResource) generateRegistrations(key string) { // each LoadBalancer entry. We only support entries that have an IP // address assigned (not hostnames). // If LoadBalancerEndpointsSync is true sync LB endpoints instead of loadbalancer ingress. - case apiv1.ServiceTypeLoadBalancer: + case corev1.ServiceTypeLoadBalancer: if t.LoadBalancerEndpointsSync { t.registerServiceInstance(baseNode, baseService, key, overridePortName, overridePortNumber, false) } else { @@ -512,7 +518,7 @@ func (t *ServiceResource) generateRegistrations(key string) { // endpoint of the service, which corresponds to the nodes the service's // pods are running on. This way we don't register _every_ K8S // node as part of the service. - case apiv1.ServiceTypeNodePort: + case corev1.ServiceTypeNodePort: if t.endpointsMap == nil { return } @@ -538,11 +544,11 @@ func (t *ServiceResource) generateRegistrations(key string) { } // Set the expected node address type - var expectedType apiv1.NodeAddressType + var expectedType corev1.NodeAddressType if t.NodePortSync == InternalOnly { - expectedType = apiv1.NodeInternalIP + expectedType = corev1.NodeInternalIP } else { - expectedType = apiv1.NodeExternalIP + expectedType = corev1.NodeExternalIP } // Find the ip address for the node and @@ -571,7 +577,7 @@ func (t *ServiceResource) generateRegistrations(key string) { // use an InternalIP if t.NodePortSync == ExternalFirst && !found { for _, address := range node.Status.Addresses { - if address.Type == apiv1.NodeInternalIP { + if address.Type == corev1.NodeInternalIP { r := baseNode rs := baseService r.Service = &rs @@ -593,7 +599,7 @@ func (t *ServiceResource) generateRegistrations(key string) { // For ClusterIP services, we register a service instance // for each endpoint. - case apiv1.ServiceTypeClusterIP: + case corev1.ServiceTypeClusterIP: t.registerServiceInstance(baseNode, baseService, key, overridePortName, overridePortNumber, true) } } @@ -674,6 +680,16 @@ func (t *ServiceResource) registerServiceInstance( r.Service.Meta[ConsulK8SNodeName] = *subsetAddr.NodeName } + r.Check = &consulapi.AgentCheck{ + CheckID: consulHealthCheckID(endpoints.Namespace, serviceID(r.Service.Service, addr)), + Name: consulKubernetesCheckName, + Namespace: baseService.Namespace, + Type: consulKubernetesCheckType, + Status: consulapi.HealthPassing, + ServiceID: serviceID(r.Service.Service, addr), + Output: kubernetesSuccessReasonMsg, + } + t.consulMap[key] = append(t.consulMap[key], &r) } } @@ -723,7 +739,7 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer { Watch(t.Ctx, options) }, }, - &apiv1.Endpoints{}, + &corev1.Endpoints{}, 0, cache.Indexers{}, ) @@ -731,7 +747,7 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer { func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { svc := t.Service - endpoints, ok := raw.(*apiv1.Endpoints) + endpoints, ok := raw.(*corev1.Endpoints) if !ok { svc.Log.Warn("upsert got invalid type", "raw", raw) return nil @@ -747,7 +763,7 @@ func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { // We are tracking this service so let's keep track of the endpoints if svc.endpointsMap == nil { - svc.endpointsMap = make(map[string]*apiv1.Endpoints) + svc.endpointsMap = make(map[string]*corev1.Endpoints) } svc.endpointsMap[key] = endpoints @@ -788,3 +804,8 @@ func (t *ServiceResource) addPrefixAndK8SNamespace(name, namespace string) strin return name } + +// consulHealthCheckID deterministically generates a health check ID based on service ID and Kubernetes namespace. +func consulHealthCheckID(k8sNS string, serviceID string) string { + return fmt.Sprintf("%s/%s", k8sNS, serviceID) +} diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index 28335dea27..9ba94123ef 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -6,6 +6,7 @@ import ( mapset "github.com/deckarep/golang-set" "github.com/hashicorp/consul-k8s/control-plane/helper/controller" + consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" @@ -1005,6 +1006,43 @@ func TestServiceResource_clusterIP(t *testing.T) { }) } +// Test that the proper registrations with health checks are generated for a ClusterIP type. +func TestServiceResource_clusterIP_healthCheck(t *testing.T) { + t.Parallel() + client := fake.NewSimpleClientset() + syncer := newTestSyncer() + serviceResource := defaultServiceResource(client, syncer) + serviceResource.ClusterIPSync = true + + // Start the controller + closer := controller.TestControllerRun(&serviceResource) + defer closer() + + // Insert the service + svc := clusterIPService("foo", metav1.NamespaceDefault) + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) + require.NoError(t, err) + + // Insert the endpoints + createEndpoints(t, client, "foo", metav1.NamespaceDefault) + + // Verify what we got + retry.Run(t, func(r *retry.R) { + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(r, actual, 2) + require.Equal(r, consulKubernetesCheckName, actual[0].Check.Name) + require.Equal(r, consulapi.HealthPassing, actual[0].Check.Status) + require.Equal(r, kubernetesSuccessReasonMsg, actual[0].Check.Output) + require.Equal(r, consulKubernetesCheckType, actual[0].Check.Type) + require.Equal(r, consulKubernetesCheckName, actual[1].Check.Name) + require.Equal(r, consulapi.HealthPassing, actual[1].Check.Status) + require.Equal(r, kubernetesSuccessReasonMsg, actual[1].Check.Output) + require.Equal(r, consulKubernetesCheckType, actual[1].Check.Type) + }) +} + // Test clusterIP with prefix. func TestServiceResource_clusterIPPrefix(t *testing.T) { t.Parallel() diff --git a/control-plane/go.mod b/control-plane/go.mod index d3c6106eac..8dd66de28d 100644 --- a/control-plane/go.mod +++ b/control-plane/go.mod @@ -9,13 +9,11 @@ require ( github.com/google/go-cmp v0.5.7 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/hashicorp/consul-k8s/control-plane/cni v0.0.0-20220831174802-b8af65262de8 - github.com/hashicorp/consul-server-connection-manager v0.0.0-20220922180412-01c5be1c636f github.com/hashicorp/consul/api v1.10.1-0.20221005170644-13da2c5fad69 github.com/hashicorp/consul/sdk v0.11.0 github.com/hashicorp/go-discover v0.0.0-20200812215701-c4b85f6ed31f github.com/hashicorp/go-hclog v1.2.2 github.com/hashicorp/go-multierror v1.1.1 - github.com/hashicorp/go-rootcerts v1.0.2 github.com/hashicorp/serf v0.10.1 github.com/kr/text v0.2.0 github.com/miekg/dns v1.1.41 diff --git a/control-plane/go.sum b/control-plane/go.sum index f281ffad23..54a0604111 100644 --- a/control-plane/go.sum +++ b/control-plane/go.sum @@ -344,8 +344,6 @@ github.com/hashicorp/consul-k8s/control-plane/cni v0.0.0-20220831174802-b8af6526 github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/api v1.10.1-0.20221005170644-13da2c5fad69 h1:IALuDSO0f6x0txq/tjUDF3sShyDMT8dmjn9af6Ik8BA= github.com/hashicorp/consul/api v1.10.1-0.20221005170644-13da2c5fad69/go.mod h1:T09kWtKqm8j1S9yTd1r0hVhfOyPrvLb0zb6dPKpNXxQ= -github.com/hashicorp/consul/proto-public v0.1.0 h1:O0LSmCqydZi363hsqc6n2v5sMz3usQMXZF6ziK3SzXU= -github.com/hashicorp/consul/proto-public v0.1.0/go.mod h1:vs2KkuWwtjkIgA5ezp4YKPzQp4GitV+q/+PvksrA92k= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/consul/sdk v0.11.0 h1:HRzj8YSCln2yGgCumN5CL8lYlD3gBurnervJRJAZyC4= github.com/hashicorp/consul/sdk v0.11.0/go.mod h1:yPkX5Q6CsxTFMjQQDJwzeNmUUF5NUGGbrDsv9wTb8cw=