From b986ce9291997148878d827bb6a41ec6aea18e17 Mon Sep 17 00:00:00 2001 From: Iryna Shustava Date: Tue, 3 Aug 2021 18:47:56 -0600 Subject: [PATCH] connect: don't get pods from the API to determine if ACL should be deleted (#580) Endpoints controller may have stale cache for pod objects since it's only watching the endpoints objects. As a result when we try to get a pod from cache, it may be stale. For the case of deleting ACL tokens, we can't tolerate stale cache reads since we need to know for sure if the pod exists to determine if an ACL token for that pod should be deleted. This commit changes it to instead use the pod info stored in the endpoints addresses as TargetRef. --- connect-inject/endpoints_controller.go | 25 +++++++++++-------- .../endpoints_controller_ent_test.go | 8 ------ connect-inject/endpoints_controller_test.go | 5 ---- subcommand/connect-init/command.go | 1 - 4 files changed, 15 insertions(+), 24 deletions(-) diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index 693b0a25a8de..4816ee5d4e54 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -124,12 +124,17 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( err := r.Client.Get(ctx, req.NamespacedName, &serviceEndpoints) + // endpointPods holds a set of all pods this endpoints object is currently pointing to. + // We use this later when we reconcile ACL tokens to decide whether an ACL token in Consul + // is for a pod that no longer exists. + endpointPods := mapset.NewSet() + // If the endpoints object has been deleted (and we get an IsNotFound // error), we need to deregister all instances in Consul for that service. if k8serrors.IsNotFound(err) { // Deregister all instances in Consul for this service. The function deregisterServiceOnAllAgents handles // the case where the Consul service name is different from the Kubernetes service name. - if err = r.deregisterServiceOnAllAgents(ctx, req.Name, req.Namespace, nil); err != nil { + if err = r.deregisterServiceOnAllAgents(ctx, req.Name, req.Namespace, nil, endpointPods); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil @@ -158,6 +163,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( for address, healthStatus := range allAddresses { if address.TargetRef != nil && address.TargetRef.Kind == "Pod" { + endpointPods.Add(address.TargetRef.Name) if err := r.registerServicesAndHealthCheck(ctx, serviceEndpoints, address, healthStatus, endpointAddressMap); err != nil { r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) @@ -169,7 +175,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( // Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister // from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during // the registration codepath. - if err = r.deregisterServiceOnAllAgents(ctx, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil { + if err = r.deregisterServiceOnAllAgents(ctx, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap, endpointPods); err != nil { r.Log.Error(err, "failed to deregister endpoints on all agents", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } @@ -673,7 +679,7 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string) // The argument endpointsAddressesMap decides whether to deregister *all* service instances or selectively deregister // them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map // has addresses, it will only deregister instances not in the map. -func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error { +func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool, endpointPods mapset.Set) error { // Get all agents by getting pods with label component=client, app=consul and release= agents := corev1.PodList{} listOptions := client.ListOptions{ @@ -727,7 +733,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, if r.AuthMethod != "" { r.Log.Info("reconciling ACL tokens for service", "svc", serviceRegistration.Service) - err = r.reconcileACLTokensForService(client, serviceRegistration.Service, k8sSvcNamespace) + err = r.reconcileACLTokensForService(client, serviceRegistration.Service, k8sSvcNamespace, endpointPods) if err != nil { r.Log.Error(err, "failed to reconcile ACL tokens for service", "svc", serviceRegistration.Service) return err @@ -740,8 +746,9 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, // reconcileACLTokensForService finds the ACL tokens that belongs to the service and deletes it from Consul. // It will only check for ACL tokens that have been created with the auth method this controller -// has been configured with. -func (r *EndpointsController) reconcileACLTokensForService(client *api.Client, serviceName, k8sNS string) error { +// has been configured with and will only delete tokens for pods that aren't in endpointPods +// (endpointPods is a set of pods that the endpoints object is pointing to). +func (r *EndpointsController) reconcileACLTokensForService(client *api.Client, serviceName, k8sNS string, endpointPods mapset.Set) error { tokens, _, err := client.ACL().TokenList(nil) if err != nil { return fmt.Errorf("failed to get a list of tokens from Consul: %s", err) @@ -760,9 +767,9 @@ func (r *EndpointsController) reconcileACLTokensForService(client *api.Client, s } podName := strings.TrimPrefix(tokenMeta[TokenMetaPodNameKey], k8sNS+"/") - err = r.Client.Get(r.Context, types.NamespacedName{Name: podName, Namespace: k8sNS}, &corev1.Pod{}) + // If we can't find token's pod, delete it. - if err != nil && k8serrors.IsNotFound(err) { + if !endpointPods.Contains(podName) { r.Log.Info("deleting ACL token for pod", "name", podName) _, err = client.ACL().TokenDelete(token.AccessorID, nil) if err != nil { @@ -896,13 +903,11 @@ func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool { // Ignores deny list. if denySet.Contains(namespace) { - fmt.Printf("%+v\n", denySet.ToSlice()...) return true } // Ignores if not in allow list or allow list is not *. if !allowSet.Contains("*") && !allowSet.Contains(namespace) { - fmt.Printf("%+v\n", allowSet.ToSlice()...) return true } diff --git a/connect-inject/endpoints_controller_ent_test.go b/connect-inject/endpoints_controller_ent_test.go index 1bfd7a009ff5..211968913879 100644 --- a/connect-inject/endpoints_controller_ent_test.go +++ b/connect-inject/endpoints_controller_ent_test.go @@ -304,10 +304,6 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) { if setup.expectedAgentHealthChecks != nil { for i := range setup.expectedConsulSvcInstances { filter := fmt.Sprintf("CheckID == `%s`", setup.expectedAgentHealthChecks[i].CheckID) - newChecks, _ := consulClient.Agent().Checks() - for key, value := range newChecks { - fmt.Printf("%s:%v\n", key, value) - } check, err := consulClient.Agent().ChecksWithFilter(filter) require.NoError(t, err) require.EqualValues(t, 1, len(check)) @@ -1306,10 +1302,6 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { if tt.expectedAgentHealthChecks != nil { for i := range tt.expectedConsulSvcInstances { filter := fmt.Sprintf("CheckID == `%s`", tt.expectedAgentHealthChecks[i].CheckID) - newChecks, _ := consulClient.Agent().Checks() - for key, value := range newChecks { - fmt.Printf("%s:%v\n", key, value) - } check, err := consulClient.Agent().ChecksWithFilter(filter) require.NoError(t, err) require.EqualValues(t, 1, len(check)) diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index 0afeb2aada17..e91da8bd58c8 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -2,7 +2,6 @@ package connectinject import ( "context" - "encoding/json" "fmt" "strings" "testing" @@ -4593,10 +4592,6 @@ func TestCreateServiceRegistrations_withTransparentProxy(t *testing.T) { pod.Spec.Containers = c.podContainers } - marshalledPod, err := json.Marshal(pod) - fmt.Println(string(marshalledPod)) - require.NoError(t, err) - // We set these annotations explicitly as these are set by the handler and we // need these values to determine which port to use for the service registration. pod.Annotations[annotationPort] = "tcp" diff --git a/subcommand/connect-init/command.go b/subcommand/connect-init/command.go index d35d6e78a260..a5fcf880b60a 100644 --- a/subcommand/connect-init/command.go +++ b/subcommand/connect-init/command.go @@ -104,7 +104,6 @@ func (c *Command) Run(args []string) int { return 1 } if c.flagACLAuthMethod != "" && c.flagServiceAccountName == "" { - fmt.Println(c.flagServiceAccountName) c.UI.Error("-service-account-name must be set when ACLs are enabled") return 1 }