From 49fb715997785be5e35399ed6ca21c15023a31c4 Mon Sep 17 00:00:00 2001 From: DanStough Date: Tue, 12 Mar 2024 16:36:46 -0400 Subject: [PATCH] fix(control-plane): acl tokens deleted while pods in graceful shutdown --- .../endpoints/endpoints_controller.go | 182 +++++++++++++----- .../endpoints/endpoints_controller_test.go | 103 +++++++++- .../inject-connect/v1controllers.go | 1 + 3 files changed, 233 insertions(+), 53 deletions(-) diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go index b0b8bea054..b8d1dd6cf9 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go @@ -10,6 +10,7 @@ import ( "regexp" "strconv" "strings" + "time" mapset "github.com/deckarep/golang-set" "github.com/go-logr/logr" @@ -25,6 +26,7 @@ import ( "github.com/hashicorp/consul-k8s/control-plane/connect-inject/common" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/lifecycle" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics" "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul-k8s/control-plane/helper/parsetags" @@ -94,6 +96,8 @@ type Controller struct { // any created Consul namespaces to allow cross namespace service discovery. // Only necessary if ACLs are enabled. CrossNSACLPolicy string + // Lifecycle config set graceful startup/shutdown defaults for pods. + LifecycleConfig lifecycle.Config // ReleaseName is the Consul Helm installation release. ReleaseName string // ReleaseNamespace is the namespace where Consul is installed. @@ -153,18 +157,14 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } err = r.Client.Get(ctx, req.NamespacedName, &serviceEndpoints) - // endpointPods holds a set of all pods this endpoints object is currently pointing to. - // We use this later when we reconcile ACL tokens to decide whether an ACL token in Consul - // is for a pod that no longer exists. - endpointPods := mapset.NewSet() // If the endpoints object has been deleted (and we get an IsNotFound // error), we need to deregister all instances in Consul for that service. if k8serrors.IsNotFound(err) { // Deregister all instances in Consul for this service. The function deregisterService handles // the case where the Consul service name is different from the Kubernetes service name. - err = r.deregisterService(apiClient, req.Name, req.Namespace, nil) - return ctrl.Result{}, err + requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil) + return ctrl.Result{RequeueAfter: requeueAfter}, err } else if err != nil { r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace) return ctrl.Result{}, err @@ -177,8 +177,8 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if isLabeledIgnore(serviceEndpoints.Labels) { // We always deregister the service to handle the case where a user has registered the service, then added the label later. r.Log.Info("ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace) - err = r.deregisterService(apiClient, req.Name, req.Namespace, nil) - return ctrl.Result{}, err + requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil) + return ctrl.Result{RequeueAfter: requeueAfter}, err } // endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare @@ -213,9 +213,11 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } if hasBeenInjected(pod) { - endpointPods.Add(address.TargetRef.Name) if isConsulDataplaneSupported(pod) { - if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil { + // Build the endpointAddressMap up for deregistering service instances later. + endpointAddressMap[pod.Status.PodIP] = true + + if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus); err != nil { r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } @@ -241,8 +243,10 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } if isGateway(pod) { - endpointPods.Add(address.TargetRef.Name) - if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil { + // Build the endpointAddressMap up for deregistering service instances later. + endpointAddressMap[pod.Status.PodIP] = true + + if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus); err != nil { r.Log.Error(err, "failed to register gateway or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } @@ -254,12 +258,13 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister // from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during // the registration codepath. - if err = r.deregisterService(apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil { + requeueAfter, err := r.deregisterService(ctx, apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap) + if err != nil { r.Log.Error(err, "failed to deregister endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } - return ctrl.Result{}, errs + return ctrl.Result{RequeueAfter: requeueAfter}, errs } func (r *Controller) Logger(name types.NamespacedName) logr.Logger { @@ -274,10 +279,7 @@ func (r *Controller) SetupWithManager(mgr ctrl.Manager) error { // registerServicesAndHealthCheck creates Consul registrations for the service and proxy and registers them with Consul. // It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready. -func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error { - // Build the endpointAddressMap up for deregistering service instances later. - endpointAddressMap[pod.Status.PodIP] = true - +func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error { var managedByEndpointsController bool if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue { managedByEndpointsController = true @@ -335,10 +337,7 @@ func parseLocality(node corev1.Node) *api.Locality { // registerGateway creates Consul registrations for the Connect Gateways and registers them with Consul. // It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready. -func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error { - // Build the endpointAddressMap up for deregistering service instances later. - endpointAddressMap[pod.Status.PodIP] = true - +func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error { var managedByEndpointsController bool if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue { managedByEndpointsController = true @@ -934,40 +933,50 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string) // The argument endpointsAddressesMap decides whether to deregister *all* service instances or selectively deregister // them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map // has addresses, it will only deregister instances not in the map. -func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error { +func (r *Controller) deregisterService( + ctx context.Context, + apiClient *api.Client, + k8sSvcName string, + k8sSvcNamespace string, + endpointsAddressesMap map[string]bool) (time.Duration, error) { + // Get services matching metadata from Consul serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace) if err != nil { r.Log.Error(err, "failed to get service instances", "name", k8sSvcName) - return err + return 0, err } var errs error + var requeueAfter time.Duration for _, svc := range serviceInstances { // We need to get services matching "k8s-service-name" and "k8s-namespace" metadata. // If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister // every service instance. var serviceDeregistered bool - if endpointsAddressesMap != nil { - if _, ok := endpointsAddressesMap[svc.ServiceAddress]; !ok { - // If the service address is not in the Endpoints addresses, deregister it. - r.Log.Info("deregistering service from consul", "svc", svc.ServiceID) - _, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{ - Node: svc.Node, - ServiceID: svc.ServiceID, - Namespace: svc.Namespace, - }, nil) - if err != nil { - // Do not exit right away as there might be other services that need to be deregistered. - r.Log.Error(err, "failed to deregister service instance", "id", svc.ServiceID) - errs = multierror.Append(errs, err) - } else { - serviceDeregistered = true - } + + if addressIsMissingFromEndpointsMap(svc.ServiceAddress, endpointsAddressesMap) { + // If graceful shutdown is enabled, continue to the next service instance and + // mark that an event requeue is needed. We should requeue at the longest time interval + // to prevent excessive re-queues. Also, updating the health status in Consul to Critical + // should prevent routing during gracefulShutdown. + podShutdownDuration, err := r.getGracefulShutdownAndUpdatePodCheck(ctx, apiClient, svc, k8sSvcNamespace) + if err != nil { + r.Log.Error(err, "failed to get pod shutdown duration", "svc", svc.ServiceName) + errs = multierror.Append(errs, err) } - } else { + + // set requeue response, then continue to the next service instance + if podShutdownDuration > requeueAfter { + requeueAfter = podShutdownDuration + } + if podShutdownDuration > 0 { + continue + } + + // If the service address is not in the Endpoints addresses, deregister it. r.Log.Info("deregistering service from consul", "svc", svc.ServiceID) - _, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{ + _, err = apiClient.Catalog().Deregister(&api.CatalogDeregistration{ Node: svc.Node, ServiceID: svc.ServiceID, Namespace: svc.Namespace, @@ -999,8 +1008,87 @@ func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvc } } - return errs + if requeueAfter > 0 { + r.Log.Info("re-queueing event", "requeueAfter", requeueAfter) + } + return requeueAfter, errs +} + +// getGracefulShutdownAndUpdatePodCheck checks if the pod is in the process of being terminated and if so, updates the +// health status of the service to critical. It returns the duration for which the pod should be re-queued (which is the pods +// gracefulShutdownPeriod setting). +func (r *Controller) getGracefulShutdownAndUpdatePodCheck(ctx context.Context, apiClient *api.Client, svc *api.CatalogService, k8sNamespace string) (time.Duration, error) { + // Get the pod, and check if it is still running. We do this to defer ACL/node cleanup for pods that are + // in graceful termination + podName := svc.ServiceMeta[constants.MetaKeyPodName] + if podName == "" { + return 0, nil + } + + var pod corev1.Pod + err := r.Client.Get(ctx, types.NamespacedName{Name: podName, Namespace: k8sNamespace}, &pod) + if err != nil && !k8serrors.IsNotFound(err) { + r.Log.Error(err, "failed to get terminating pod", "name", podName, "k8sNamespace", k8sNamespace) + return 0, fmt.Errorf("failed to get terminating pod %s/%s: %w", k8sNamespace, podName, err) + } + if k8serrors.IsNotFound(err) { + return 0, nil + } + + shutdownSeconds, err := r.getGracefulShutdownPeriodSecondsForPod(pod) + if err != nil { + r.Log.Error(err, "failed to get graceful shutdown period for pod", "name", pod, "k8sNamespace", k8sNamespace) + return 0, fmt.Errorf("failed to get graceful shutdown period for pod %s/%s: %w", k8sNamespace, podName, err) + } + + if shutdownSeconds > 0 { + // Update the health status of the service to critical so that we can drain inbound traffic. + // We don't need to handle the proxy service since that will be reconciled looping through all the service instances. + serviceRegistration := &api.CatalogRegistration{ + Node: common.ConsulNodeNameFromK8sNode(pod.Spec.NodeName), + Address: pod.Status.HostIP, + // Service is nil since we are patching the health status + Check: &api.AgentCheck{ + CheckID: consulHealthCheckID(pod.Namespace, svc.ServiceID), + Name: constants.ConsulKubernetesCheckName, + Type: constants.ConsulKubernetesCheckType, + Status: api.HealthCritical, + ServiceID: svc.ServiceID, + Output: fmt.Sprintf("Pod \"%s/%s\" is terminating", pod.Namespace, podName), + Namespace: r.consulNamespace(pod.Namespace), + }, + SkipNodeUpdate: true, + } + + r.Log.Info("updating health status of service with Consul", "name", svc.ServiceName, + "id", svc.ServiceID, "pod", podName, "k8sNamespace", pod.Namespace) + _, err = apiClient.Catalog().Register(serviceRegistration, nil) + if err != nil { + r.Log.Error(err, "failed to update service health status", "name", svc.ServiceName, "pod", podName) + return 0, fmt.Errorf("failed to update service health status for pod %s/%s: %w", pod.Namespace, podName, err) + } + + // Return the duration for which the pod should be re-queued. We add 20% to the shutdownSeconds to account for + // any potential delay in the pod killed. + return time.Duration(shutdownSeconds+int(float64(shutdownSeconds)*0.2)) * time.Second, nil + } + return 0, nil +} + +// getGracefulShutdownPeriodSecondsForPod returns the graceful shutdown period for the pod. If one is not specified, +// either through the controller configuration or pod annotations, it returns 0. +func (r *Controller) getGracefulShutdownPeriodSecondsForPod(pod corev1.Pod) (int, error) { + enabled, err := r.LifecycleConfig.EnableProxyLifecycle(pod) + if err != nil { + return 0, fmt.Errorf("failed to get parse proxy lifecycle configuration for pod %s/%s: %w", pod.Namespace, pod.Name, err) + } + // Check that SidecarProxyLifecycle is enabled. + if !enabled { + return 0, nil + } + + return r.LifecycleConfig.ShutdownGracePeriodSeconds(pod) } // deregisterNode removes a node if it does not have any associated services attached to it. @@ -1529,3 +1617,11 @@ func getMultiPortIdx(pod corev1.Pod, serviceEndpoints corev1.Endpoints) int { } return -1 } + +func addressIsMissingFromEndpointsMap(address string, endpointsAddressesMap map[string]bool) bool { + if endpointsAddressesMap == nil { + return true + } + _, ok := endpointsAddressesMap[address] + return !ok +} diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go index 4527ec84b8..7e4ea0d753 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" "testing" + "time" mapset "github.com/deckarep/golang-set" logrtest "github.com/go-logr/logr/testr" @@ -3903,12 +3904,16 @@ func TestReconcileUpdateEndpoint_LegacyService(t *testing.T) { func TestReconcileDeleteEndpoint(t *testing.T) { t.Parallel() cases := []struct { - name string - consulSvcName string - consulPodUid string - expectServicesToBeDeleted bool - initialConsulSvcs []*api.AgentService - enableACLs bool + name string + consulSvcName string + pod *corev1.Pod // If this is present, a pod will be created in the fake kube client + consulPodUid string + expectServicesToBeDeleted bool + expectServicesToBeCritical bool + initialConsulSvcs []*api.AgentService + enableACLs bool + expectTokens bool + requeueAfter time.Duration }{ { name: "Legacy service: does not delete", @@ -4030,6 +4035,66 @@ func TestReconcileDeleteEndpoint(t *testing.T) { }, enableACLs: true, }, + { + name: "When graceful shutdown is enabled with ACLs, tokens should not be deleted", + consulSvcName: "service-deleted", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "123", + Annotations: map[string]string{ + constants.AnnotationEnableSidecarProxyLifecycle: "true", + constants.AnnotationSidecarProxyLifecycleShutdownGracePeriodSeconds: "5", + }, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + // We don't need any other fields for this test + }, + }, + consulPodUid: "123", + expectServicesToBeDeleted: false, + expectServicesToBeCritical: true, + initialConsulSvcs: []*api.AgentService{ + { + ID: "pod1-service-deleted", + Service: "service-deleted", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{ + metaKeyKubeServiceName: "service-deleted", + constants.MetaKeyKubeNS: "default", + metaKeyManagedBy: constants.ManagedByValue, + metaKeySyntheticNode: "true", + constants.MetaKeyPodName: "pod1", + constants.MetaKeyPodUID: "123", + }, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-deleted-sidecar-proxy", + Service: "service-deleted-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-deleted", + DestinationServiceID: "pod1-service-deleted", + }, + Meta: map[string]string{ + metaKeyKubeServiceName: "service-deleted", + constants.MetaKeyKubeNS: "default", + metaKeyManagedBy: constants.ManagedByValue, + metaKeySyntheticNode: "true", + constants.MetaKeyPodName: "pod1", + constants.MetaKeyPodUID: "123", + }, + }, + }, + requeueAfter: time.Duration(6) * time.Second, + expectTokens: true, + enableACLs: true, + }, { name: "Mesh Gateway", consulSvcName: "service-deleted", @@ -4209,10 +4274,16 @@ func TestReconcileDeleteEndpoint(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { // Add the default namespace. - ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} - node := corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} + node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + objs := []runtime.Object{ns, node} + + if tt.pod != nil { + objs = append(objs, tt.pod) + } + // Create fake k8s client. - fakeClient := fake.NewClientBuilder().WithRuntimeObjects(&ns, &node).Build() + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(objs...).Build() // Create test consulServer server adminToken := "123e4567-e89b-12d3-a456-426614174000" @@ -4285,6 +4356,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) { }) require.NoError(t, err) require.False(t, resp.Requeue) + require.Equal(t, tt.requeueAfter, resp.RequeueAfter) // After reconciliation, Consul should not have any instances of service-deleted serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil) @@ -4300,10 +4372,21 @@ func TestReconcileDeleteEndpoint(t *testing.T) { require.NotEmpty(t, serviceInstances) } - if tt.enableACLs { + if tt.expectServicesToBeCritical { + checks, _, err := consulClient.Health().Checks(tt.consulSvcName, nil) + require.NoError(t, err) + require.Equal(t, api.HealthCritical, checks.AggregatedStatus()) + } + + if tt.enableACLs && !tt.expectTokens { _, _, err = consulClient.ACL().TokenRead(token.AccessorID, nil) + require.Error(t, err) require.Contains(t, err.Error(), "ACL not found") } + if tt.expectTokens { + _, _, err = consulClient.ACL().TokenRead(token.AccessorID, nil) + require.NoError(t, err) + } }) } } diff --git a/control-plane/subcommand/inject-connect/v1controllers.go b/control-plane/subcommand/inject-connect/v1controllers.go index 3dfa99bc48..e923f6da9d 100644 --- a/control-plane/subcommand/inject-connect/v1controllers.go +++ b/control-plane/subcommand/inject-connect/v1controllers.go @@ -65,6 +65,7 @@ func (c *Command) configureV1Controllers(ctx context.Context, mgr manager.Manage EnableNSMirroring: c.flagEnableK8SNSMirroring, NSMirroringPrefix: c.flagK8SNSMirroringPrefix, CrossNSACLPolicy: c.flagCrossNamespaceACLPolicy, + LifecycleConfig: lifecycleConfig, EnableTransparentProxy: c.flagDefaultEnableTransparentProxy, EnableWANFederation: c.flagEnableFederation, TProxyOverwriteProbes: c.flagTransparentProxyDefaultOverwriteProbes,