From 4a7cfeb036c461912e0db15caabb90dab25615a8 Mon Sep 17 00:00:00 2001 From: DanStough Date: Tue, 12 Mar 2024 16:36:46 -0400 Subject: [PATCH] fix(control-plane): acl tokens deleted while pods in graceful shutdown --- .changelog/3736.txt | 4 + acceptance/framework/consul/helm_cluster.go | 13 +- .../framework/consul/helm_cluster_test.go | 14 +- .../endpoints/endpoints_controller.go | 185 ++++++++++++++---- .../endpoints/endpoints_controller_test.go | 103 +++++++++- .../inject-connect/v1controllers.go | 1 + 6 files changed, 258 insertions(+), 62 deletions(-) create mode 100644 .changelog/3736.txt diff --git a/.changelog/3736.txt b/.changelog/3736.txt new file mode 100644 index 0000000000..1b86f858f4 --- /dev/null +++ b/.changelog/3736.txt @@ -0,0 +1,4 @@ +```release-note:bug +control-plane: fix an issue where ACL token cleanup did not respect a pod's GracefulShutdownPeriodSeconds and +tokens were invalidated immediately on pod entering Terminating state. +``` \ No newline at end of file diff --git a/acceptance/framework/consul/helm_cluster.go b/acceptance/framework/consul/helm_cluster.go index 638e0e7c51..55239eec03 100644 --- a/acceptance/framework/consul/helm_cluster.go +++ b/acceptance/framework/consul/helm_cluster.go @@ -26,16 +26,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/hashicorp/consul-k8s/acceptance/framework/config" "github.com/hashicorp/consul-k8s/acceptance/framework/environment" "github.com/hashicorp/consul-k8s/acceptance/framework/helpers" "github.com/hashicorp/consul-k8s/acceptance/framework/k8s" "github.com/hashicorp/consul-k8s/acceptance/framework/logger" "github.com/hashicorp/consul-k8s/acceptance/framework/portforward" - "github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/proto-public/pbresource" - "github.com/hashicorp/consul/sdk/testutil/retry" ) // HelmCluster implements Cluster and uses Helm @@ -752,6 +753,10 @@ func defaultValues() map[string]string { // (false positive). "dns.enabled": "false", + // Adjust the default value from 30s to 1s since we have several tests that verify tokens are cleaned up, + // and many of them are using the default retryer (7s max). + "connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "1", + // Enable trace logs for servers and clients. "server.extraConfig": `"{\"log_level\": \"TRACE\"}"`, "client.extraConfig": `"{\"log_level\": \"TRACE\"}"`, diff --git a/acceptance/framework/consul/helm_cluster_test.go b/acceptance/framework/consul/helm_cluster_test.go index 3544718c9e..1d2b744bea 100644 --- a/acceptance/framework/consul/helm_cluster_test.go +++ b/acceptance/framework/consul/helm_cluster_test.go @@ -7,14 +7,15 @@ import ( "testing" "github.com/gruntwork-io/terratest/modules/k8s" - "github.com/hashicorp/consul-k8s/acceptance/framework/config" - "github.com/hashicorp/consul-k8s/acceptance/framework/environment" "github.com/hashicorp/consul/sdk/testutil" "github.com/stretchr/testify/require" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "sigs.k8s.io/controller-runtime/pkg/client" runtimefake "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/hashicorp/consul-k8s/acceptance/framework/config" + "github.com/hashicorp/consul-k8s/acceptance/framework/environment" ) // Test that if TestConfig has values that need to be provided @@ -33,7 +34,8 @@ func TestNewHelmCluster(t *testing.T) { "global.image": "test-config-image", "global.logLevel": "debug", "server.replicas": "1", - "connectInject.transparentProxy.defaultEnabled": "false", + "connectInject.transparentProxy.defaultEnabled": "false", + "connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "1", "dns.enabled": "false", "server.extraConfig": `"{\"log_level\": \"TRACE\"}"`, "client.extraConfig": `"{\"log_level\": \"TRACE\"}"`, @@ -46,7 +48,8 @@ func TestNewHelmCluster(t *testing.T) { "global.logLevel": "debug", "server.bootstrapExpect": "3", "server.replicas": "3", - "connectInject.transparentProxy.defaultEnabled": "true", + "connectInject.transparentProxy.defaultEnabled": "true", + "connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "3", "dns.enabled": "true", "server.extraConfig": `"{\"foo\": \"bar\"}"`, "client.extraConfig": `"{\"foo\": \"bar\"}"`, @@ -57,7 +60,8 @@ func TestNewHelmCluster(t *testing.T) { "global.logLevel": "debug", "server.bootstrapExpect": "3", "server.replicas": "3", - "connectInject.transparentProxy.defaultEnabled": "true", + "connectInject.transparentProxy.defaultEnabled": "true", + "connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "3", "dns.enabled": "true", "server.extraConfig": `"{\"foo\": \"bar\"}"`, "client.extraConfig": `"{\"foo\": \"bar\"}"`, diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go index b0b8bea054..0094f40c76 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go @@ -6,10 +6,12 @@ import ( "context" "encoding/json" "fmt" + "math" "net" "regexp" "strconv" "strings" + "time" mapset "github.com/deckarep/golang-set" "github.com/go-logr/logr" @@ -25,6 +27,7 @@ import ( "github.com/hashicorp/consul-k8s/control-plane/connect-inject/common" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/lifecycle" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics" "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul-k8s/control-plane/helper/parsetags" @@ -94,6 +97,8 @@ type Controller struct { // any created Consul namespaces to allow cross namespace service discovery. // Only necessary if ACLs are enabled. CrossNSACLPolicy string + // Lifecycle config set graceful startup/shutdown defaults for pods. + LifecycleConfig lifecycle.Config // ReleaseName is the Consul Helm installation release. ReleaseName string // ReleaseNamespace is the namespace where Consul is installed. @@ -153,18 +158,14 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } err = r.Client.Get(ctx, req.NamespacedName, &serviceEndpoints) - // endpointPods holds a set of all pods this endpoints object is currently pointing to. - // We use this later when we reconcile ACL tokens to decide whether an ACL token in Consul - // is for a pod that no longer exists. - endpointPods := mapset.NewSet() // If the endpoints object has been deleted (and we get an IsNotFound // error), we need to deregister all instances in Consul for that service. if k8serrors.IsNotFound(err) { // Deregister all instances in Consul for this service. The function deregisterService handles // the case where the Consul service name is different from the Kubernetes service name. - err = r.deregisterService(apiClient, req.Name, req.Namespace, nil) - return ctrl.Result{}, err + requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil) + return ctrl.Result{RequeueAfter: requeueAfter}, err } else if err != nil { r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace) return ctrl.Result{}, err @@ -177,8 +178,8 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if isLabeledIgnore(serviceEndpoints.Labels) { // We always deregister the service to handle the case where a user has registered the service, then added the label later. r.Log.Info("ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace) - err = r.deregisterService(apiClient, req.Name, req.Namespace, nil) - return ctrl.Result{}, err + requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil) + return ctrl.Result{RequeueAfter: requeueAfter}, err } // endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare @@ -213,12 +214,13 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } if hasBeenInjected(pod) { - endpointPods.Add(address.TargetRef.Name) if isConsulDataplaneSupported(pod) { - if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil { + if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus); err != nil { r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } + // Build the endpointAddressMap up for deregistering service instances later. + endpointAddressMap[pod.Status.PodIP] = true } else { r.Log.Info("detected an update to pre-consul-dataplane service", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) nodeAgentClientCfg, err := r.consulClientCfgForNodeAgent(apiClient, pod, serverState) @@ -241,11 +243,12 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } if isGateway(pod) { - endpointPods.Add(address.TargetRef.Name) - if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil { + if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus); err != nil { r.Log.Error(err, "failed to register gateway or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } + // Build the endpointAddressMap up for deregistering service instances later. + endpointAddressMap[pod.Status.PodIP] = true } } } @@ -254,12 +257,13 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister // from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during // the registration codepath. - if err = r.deregisterService(apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil { + requeueAfter, err := r.deregisterService(ctx, apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap) + if err != nil { r.Log.Error(err, "failed to deregister endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } - return ctrl.Result{}, errs + return ctrl.Result{RequeueAfter: requeueAfter}, errs } func (r *Controller) Logger(name types.NamespacedName) logr.Logger { @@ -274,10 +278,7 @@ func (r *Controller) SetupWithManager(mgr ctrl.Manager) error { // registerServicesAndHealthCheck creates Consul registrations for the service and proxy and registers them with Consul. // It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready. -func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error { - // Build the endpointAddressMap up for deregistering service instances later. - endpointAddressMap[pod.Status.PodIP] = true - +func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error { var managedByEndpointsController bool if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue { managedByEndpointsController = true @@ -335,10 +336,7 @@ func parseLocality(node corev1.Node) *api.Locality { // registerGateway creates Consul registrations for the Connect Gateways and registers them with Consul. // It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready. -func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error { - // Build the endpointAddressMap up for deregistering service instances later. - endpointAddressMap[pod.Status.PodIP] = true - +func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error { var managedByEndpointsController bool if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue { managedByEndpointsController = true @@ -934,40 +932,54 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string) // The argument endpointsAddressesMap decides whether to deregister *all* service instances or selectively deregister // them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map // has addresses, it will only deregister instances not in the map. -func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error { +// If the pod backing a Consul service instance still exists and the graceful shutdown lifecycle mode is enabled, the instance +// will not be deregistered. Instead, its health check will be updated to Critical in order to drain incoming traffic and +// this function will return a requeueAfter duration. This can be used to requeue the event at the longest shutdown time +// interval to clean up these instances after they have exited. +func (r *Controller) deregisterService( + ctx context.Context, + apiClient *api.Client, + k8sSvcName string, + k8sSvcNamespace string, + endpointsAddressesMap map[string]bool) (time.Duration, error) { + // Get services matching metadata from Consul serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace) if err != nil { r.Log.Error(err, "failed to get service instances", "name", k8sSvcName) - return err + return 0, err } var errs error + var requeueAfter time.Duration for _, svc := range serviceInstances { // We need to get services matching "k8s-service-name" and "k8s-namespace" metadata. // If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister // every service instance. var serviceDeregistered bool - if endpointsAddressesMap != nil { - if _, ok := endpointsAddressesMap[svc.ServiceAddress]; !ok { - // If the service address is not in the Endpoints addresses, deregister it. - r.Log.Info("deregistering service from consul", "svc", svc.ServiceID) - _, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{ - Node: svc.Node, - ServiceID: svc.ServiceID, - Namespace: svc.Namespace, - }, nil) - if err != nil { - // Do not exit right away as there might be other services that need to be deregistered. - r.Log.Error(err, "failed to deregister service instance", "id", svc.ServiceID) - errs = multierror.Append(errs, err) - } else { - serviceDeregistered = true - } + + if addressIsMissingFromEndpointsMap(svc.ServiceAddress, endpointsAddressesMap) { + // If graceful shutdown is enabled, continue to the next service instance and + // mark that an event requeue is needed. We should requeue at the longest time interval + // to prevent excessive re-queues. Also, updating the health status in Consul to Critical + // should prevent routing during gracefulShutdown. + podShutdownDuration, err := r.getGracefulShutdownAndUpdatePodCheck(ctx, apiClient, svc, k8sSvcNamespace) + if err != nil { + r.Log.Error(err, "failed to get pod shutdown duration", "svc", svc.ServiceName) + errs = multierror.Append(errs, err) } - } else { + + // set requeue response, then continue to the next service instance + if podShutdownDuration > requeueAfter { + requeueAfter = podShutdownDuration + } + if podShutdownDuration > 0 { + continue + } + + // If the service address is not in the Endpoints addresses, deregister it. r.Log.Info("deregistering service from consul", "svc", svc.ServiceID) - _, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{ + _, err = apiClient.Catalog().Deregister(&api.CatalogDeregistration{ Node: svc.Node, ServiceID: svc.ServiceID, Namespace: svc.Namespace, @@ -999,8 +1011,87 @@ func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvc } } - return errs + if requeueAfter > 0 { + r.Log.Info("re-queueing event for graceful shutdown", "name", k8sSvcName, "k8sNamespace", k8sSvcNamespace, "requeueAfter", requeueAfter) + } + + return requeueAfter, errs +} + +// getGracefulShutdownAndUpdatePodCheck checks if the pod is in the process of being terminated and if so, updates the +// health status of the service to critical. It returns the duration for which the pod should be re-queued (which is the pods +// gracefulShutdownPeriod setting). +func (r *Controller) getGracefulShutdownAndUpdatePodCheck(ctx context.Context, apiClient *api.Client, svc *api.CatalogService, k8sNamespace string) (time.Duration, error) { + // Get the pod, and check if it is still running. We do this to defer ACL/node cleanup for pods that are + // in graceful termination + podName := svc.ServiceMeta[constants.MetaKeyPodName] + if podName == "" { + return 0, nil + } + + var pod corev1.Pod + err := r.Client.Get(ctx, types.NamespacedName{Name: podName, Namespace: k8sNamespace}, &pod) + if k8serrors.IsNotFound(err) { + return 0, nil + } + if err != nil { + r.Log.Error(err, "failed to get terminating pod", "name", podName, "k8sNamespace", k8sNamespace) + return 0, fmt.Errorf("failed to get terminating pod %s/%s: %w", k8sNamespace, podName, err) + } + + shutdownSeconds, err := r.getGracefulShutdownPeriodSecondsForPod(pod) + if err != nil { + r.Log.Error(err, "failed to get graceful shutdown period for pod", "name", pod, "k8sNamespace", k8sNamespace) + return 0, fmt.Errorf("failed to get graceful shutdown period for pod %s/%s: %w", k8sNamespace, podName, err) + } + + if shutdownSeconds > 0 { + // Update the health status of the service to critical so that we can drain inbound traffic. + // We don't need to handle the proxy service since that will be reconciled looping through all the service instances. + serviceRegistration := &api.CatalogRegistration{ + Node: common.ConsulNodeNameFromK8sNode(pod.Spec.NodeName), + Address: pod.Status.HostIP, + // Service is nil since we are patching the health status + Check: &api.AgentCheck{ + CheckID: consulHealthCheckID(pod.Namespace, svc.ServiceID), + Name: constants.ConsulKubernetesCheckName, + Type: constants.ConsulKubernetesCheckType, + Status: api.HealthCritical, + ServiceID: svc.ServiceID, + Output: fmt.Sprintf("Pod \"%s/%s\" is terminating", pod.Namespace, podName), + Namespace: r.consulNamespace(pod.Namespace), + }, + SkipNodeUpdate: true, + } + + r.Log.Info("updating health status of service with Consul to critical in order to drain inbound traffic", "name", svc.ServiceName, + "id", svc.ServiceID, "pod", podName, "k8sNamespace", pod.Namespace) + _, err = apiClient.Catalog().Register(serviceRegistration, nil) + if err != nil { + r.Log.Error(err, "failed to update service health status to critical", "name", svc.ServiceName, "pod", podName) + return 0, fmt.Errorf("failed to update service health status for pod %s/%s to critical: %w", pod.Namespace, podName, err) + } + + // Return the duration for which the pod should be re-queued. We add 20% to the shutdownSeconds to account for + // any potential delay in the pod killed. + return time.Duration(shutdownSeconds+int(math.Ceil(float64(shutdownSeconds)*0.2))) * time.Second, nil + } + return 0, nil +} + +// getGracefulShutdownPeriodSecondsForPod returns the graceful shutdown period for the pod. If one is not specified, +// either through the controller configuration or pod annotations, it returns 0. +func (r *Controller) getGracefulShutdownPeriodSecondsForPod(pod corev1.Pod) (int, error) { + enabled, err := r.LifecycleConfig.EnableProxyLifecycle(pod) + if err != nil { + return 0, fmt.Errorf("failed to get parse proxy lifecycle configuration for pod %s/%s: %w", pod.Namespace, pod.Name, err) + } + // Check that SidecarProxyLifecycle is enabled. + if !enabled { + return 0, nil + } + return r.LifecycleConfig.ShutdownGracePeriodSeconds(pod) } // deregisterNode removes a node if it does not have any associated services attached to it. @@ -1529,3 +1620,11 @@ func getMultiPortIdx(pod corev1.Pod, serviceEndpoints corev1.Endpoints) int { } return -1 } + +func addressIsMissingFromEndpointsMap(address string, endpointsAddressesMap map[string]bool) bool { + if endpointsAddressesMap == nil { + return true + } + _, ok := endpointsAddressesMap[address] + return !ok +} diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go index 4527ec84b8..7e4ea0d753 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" "testing" + "time" mapset "github.com/deckarep/golang-set" logrtest "github.com/go-logr/logr/testr" @@ -3903,12 +3904,16 @@ func TestReconcileUpdateEndpoint_LegacyService(t *testing.T) { func TestReconcileDeleteEndpoint(t *testing.T) { t.Parallel() cases := []struct { - name string - consulSvcName string - consulPodUid string - expectServicesToBeDeleted bool - initialConsulSvcs []*api.AgentService - enableACLs bool + name string + consulSvcName string + pod *corev1.Pod // If this is present, a pod will be created in the fake kube client + consulPodUid string + expectServicesToBeDeleted bool + expectServicesToBeCritical bool + initialConsulSvcs []*api.AgentService + enableACLs bool + expectTokens bool + requeueAfter time.Duration }{ { name: "Legacy service: does not delete", @@ -4030,6 +4035,66 @@ func TestReconcileDeleteEndpoint(t *testing.T) { }, enableACLs: true, }, + { + name: "When graceful shutdown is enabled with ACLs, tokens should not be deleted", + consulSvcName: "service-deleted", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "123", + Annotations: map[string]string{ + constants.AnnotationEnableSidecarProxyLifecycle: "true", + constants.AnnotationSidecarProxyLifecycleShutdownGracePeriodSeconds: "5", + }, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + // We don't need any other fields for this test + }, + }, + consulPodUid: "123", + expectServicesToBeDeleted: false, + expectServicesToBeCritical: true, + initialConsulSvcs: []*api.AgentService{ + { + ID: "pod1-service-deleted", + Service: "service-deleted", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{ + metaKeyKubeServiceName: "service-deleted", + constants.MetaKeyKubeNS: "default", + metaKeyManagedBy: constants.ManagedByValue, + metaKeySyntheticNode: "true", + constants.MetaKeyPodName: "pod1", + constants.MetaKeyPodUID: "123", + }, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-deleted-sidecar-proxy", + Service: "service-deleted-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-deleted", + DestinationServiceID: "pod1-service-deleted", + }, + Meta: map[string]string{ + metaKeyKubeServiceName: "service-deleted", + constants.MetaKeyKubeNS: "default", + metaKeyManagedBy: constants.ManagedByValue, + metaKeySyntheticNode: "true", + constants.MetaKeyPodName: "pod1", + constants.MetaKeyPodUID: "123", + }, + }, + }, + requeueAfter: time.Duration(6) * time.Second, + expectTokens: true, + enableACLs: true, + }, { name: "Mesh Gateway", consulSvcName: "service-deleted", @@ -4209,10 +4274,16 @@ func TestReconcileDeleteEndpoint(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { // Add the default namespace. - ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} - node := corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} + node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + objs := []runtime.Object{ns, node} + + if tt.pod != nil { + objs = append(objs, tt.pod) + } + // Create fake k8s client. - fakeClient := fake.NewClientBuilder().WithRuntimeObjects(&ns, &node).Build() + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(objs...).Build() // Create test consulServer server adminToken := "123e4567-e89b-12d3-a456-426614174000" @@ -4285,6 +4356,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) { }) require.NoError(t, err) require.False(t, resp.Requeue) + require.Equal(t, tt.requeueAfter, resp.RequeueAfter) // After reconciliation, Consul should not have any instances of service-deleted serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil) @@ -4300,10 +4372,21 @@ func TestReconcileDeleteEndpoint(t *testing.T) { require.NotEmpty(t, serviceInstances) } - if tt.enableACLs { + if tt.expectServicesToBeCritical { + checks, _, err := consulClient.Health().Checks(tt.consulSvcName, nil) + require.NoError(t, err) + require.Equal(t, api.HealthCritical, checks.AggregatedStatus()) + } + + if tt.enableACLs && !tt.expectTokens { _, _, err = consulClient.ACL().TokenRead(token.AccessorID, nil) + require.Error(t, err) require.Contains(t, err.Error(), "ACL not found") } + if tt.expectTokens { + _, _, err = consulClient.ACL().TokenRead(token.AccessorID, nil) + require.NoError(t, err) + } }) } } diff --git a/control-plane/subcommand/inject-connect/v1controllers.go b/control-plane/subcommand/inject-connect/v1controllers.go index 3dfa99bc48..e923f6da9d 100644 --- a/control-plane/subcommand/inject-connect/v1controllers.go +++ b/control-plane/subcommand/inject-connect/v1controllers.go @@ -65,6 +65,7 @@ func (c *Command) configureV1Controllers(ctx context.Context, mgr manager.Manage EnableNSMirroring: c.flagEnableK8SNSMirroring, NSMirroringPrefix: c.flagK8SNSMirroringPrefix, CrossNSACLPolicy: c.flagCrossNamespaceACLPolicy, + LifecycleConfig: lifecycleConfig, EnableTransparentProxy: c.flagDefaultEnableTransparentProxy, EnableWANFederation: c.flagEnableFederation, TProxyOverwriteProbes: c.flagTransparentProxyDefaultOverwriteProbes,