Skip to content

Commit

Permalink
connect: don't get pods from the API to determine if ACL should be de…
Browse files Browse the repository at this point in the history
…leted (hashicorp#580)

Endpoints controller may have stale cache for pod objects since it's only
watching the endpoints objects. As a result when we try to get a pod
from cache, it may be stale. For the case of deleting ACL tokens,
we can't tolerate stale cache reads since we need to know for sure
if the pod exists to determine if an ACL token for that pod should be deleted.
This commit changes it to instead use the pod info stored in the endpoints addresses
as TargetRef.
  • Loading branch information
ishustava authored Aug 4, 2021
1 parent b87f00c commit b986ce9
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 24 deletions.
25 changes: 15 additions & 10 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,17 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (

err := r.Client.Get(ctx, req.NamespacedName, &serviceEndpoints)

// endpointPods holds a set of all pods this endpoints object is currently pointing to.
// We use this later when we reconcile ACL tokens to decide whether an ACL token in Consul
// is for a pod that no longer exists.
endpointPods := mapset.NewSet()

// If the endpoints object has been deleted (and we get an IsNotFound
// error), we need to deregister all instances in Consul for that service.
if k8serrors.IsNotFound(err) {
// Deregister all instances in Consul for this service. The function deregisterServiceOnAllAgents handles
// the case where the Consul service name is different from the Kubernetes service name.
if err = r.deregisterServiceOnAllAgents(ctx, req.Name, req.Namespace, nil); err != nil {
if err = r.deregisterServiceOnAllAgents(ctx, req.Name, req.Namespace, nil, endpointPods); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
Expand Down Expand Up @@ -158,6 +163,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (

for address, healthStatus := range allAddresses {
if address.TargetRef != nil && address.TargetRef.Kind == "Pod" {
endpointPods.Add(address.TargetRef.Name)
if err := r.registerServicesAndHealthCheck(ctx, serviceEndpoints, address, healthStatus, endpointAddressMap); err != nil {
r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
Expand All @@ -169,7 +175,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
// Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister
// from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during
// the registration codepath.
if err = r.deregisterServiceOnAllAgents(ctx, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil {
if err = r.deregisterServiceOnAllAgents(ctx, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap, endpointPods); err != nil {
r.Log.Error(err, "failed to deregister endpoints on all agents", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
Expand Down Expand Up @@ -673,7 +679,7 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string)
// The argument endpointsAddressesMap decides whether to deregister *all* service instances or selectively deregister
// them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map
// has addresses, it will only deregister instances not in the map.
func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {
func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool, endpointPods mapset.Set) error {
// Get all agents by getting pods with label component=client, app=consul and release=<ReleaseName>
agents := corev1.PodList{}
listOptions := client.ListOptions{
Expand Down Expand Up @@ -727,7 +733,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,

if r.AuthMethod != "" {
r.Log.Info("reconciling ACL tokens for service", "svc", serviceRegistration.Service)
err = r.reconcileACLTokensForService(client, serviceRegistration.Service, k8sSvcNamespace)
err = r.reconcileACLTokensForService(client, serviceRegistration.Service, k8sSvcNamespace, endpointPods)
if err != nil {
r.Log.Error(err, "failed to reconcile ACL tokens for service", "svc", serviceRegistration.Service)
return err
Expand All @@ -740,8 +746,9 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,

// reconcileACLTokensForService finds the ACL tokens that belongs to the service and deletes it from Consul.
// It will only check for ACL tokens that have been created with the auth method this controller
// has been configured with.
func (r *EndpointsController) reconcileACLTokensForService(client *api.Client, serviceName, k8sNS string) error {
// has been configured with and will only delete tokens for pods that aren't in endpointPods
// (endpointPods is a set of pods that the endpoints object is pointing to).
func (r *EndpointsController) reconcileACLTokensForService(client *api.Client, serviceName, k8sNS string, endpointPods mapset.Set) error {
tokens, _, err := client.ACL().TokenList(nil)
if err != nil {
return fmt.Errorf("failed to get a list of tokens from Consul: %s", err)
Expand All @@ -760,9 +767,9 @@ func (r *EndpointsController) reconcileACLTokensForService(client *api.Client, s
}

podName := strings.TrimPrefix(tokenMeta[TokenMetaPodNameKey], k8sNS+"/")
err = r.Client.Get(r.Context, types.NamespacedName{Name: podName, Namespace: k8sNS}, &corev1.Pod{})

// If we can't find token's pod, delete it.
if err != nil && k8serrors.IsNotFound(err) {
if !endpointPods.Contains(podName) {
r.Log.Info("deleting ACL token for pod", "name", podName)
_, err = client.ACL().TokenDelete(token.AccessorID, nil)
if err != nil {
Expand Down Expand Up @@ -896,13 +903,11 @@ func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool {

// Ignores deny list.
if denySet.Contains(namespace) {
fmt.Printf("%+v\n", denySet.ToSlice()...)
return true
}

// Ignores if not in allow list or allow list is not *.
if !allowSet.Contains("*") && !allowSet.Contains(namespace) {
fmt.Printf("%+v\n", allowSet.ToSlice()...)
return true
}

Expand Down
8 changes: 0 additions & 8 deletions connect-inject/endpoints_controller_ent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,6 @@ func TestReconcileCreateEndpointWithNamespaces(t *testing.T) {
if setup.expectedAgentHealthChecks != nil {
for i := range setup.expectedConsulSvcInstances {
filter := fmt.Sprintf("CheckID == `%s`", setup.expectedAgentHealthChecks[i].CheckID)
newChecks, _ := consulClient.Agent().Checks()
for key, value := range newChecks {
fmt.Printf("%s:%v\n", key, value)
}
check, err := consulClient.Agent().ChecksWithFilter(filter)
require.NoError(t, err)
require.EqualValues(t, 1, len(check))
Expand Down Expand Up @@ -1306,10 +1302,6 @@ func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) {
if tt.expectedAgentHealthChecks != nil {
for i := range tt.expectedConsulSvcInstances {
filter := fmt.Sprintf("CheckID == `%s`", tt.expectedAgentHealthChecks[i].CheckID)
newChecks, _ := consulClient.Agent().Checks()
for key, value := range newChecks {
fmt.Printf("%s:%v\n", key, value)
}
check, err := consulClient.Agent().ChecksWithFilter(filter)
require.NoError(t, err)
require.EqualValues(t, 1, len(check))
Expand Down
5 changes: 0 additions & 5 deletions connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package connectinject

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -4593,10 +4592,6 @@ func TestCreateServiceRegistrations_withTransparentProxy(t *testing.T) {
pod.Spec.Containers = c.podContainers
}

marshalledPod, err := json.Marshal(pod)
fmt.Println(string(marshalledPod))
require.NoError(t, err)

// We set these annotations explicitly as these are set by the handler and we
// need these values to determine which port to use for the service registration.
pod.Annotations[annotationPort] = "tcp"
Expand Down
1 change: 0 additions & 1 deletion subcommand/connect-init/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func (c *Command) Run(args []string) int {
return 1
}
if c.flagACLAuthMethod != "" && c.flagServiceAccountName == "" {
fmt.Println(c.flagServiceAccountName)
c.UI.Error("-service-account-name must be set when ACLs are enabled")
return 1
}
Expand Down

0 comments on commit b986ce9

Please sign in to comment.