Skip to content

Commit

Permalink
fix(control-plane): acl tokens deleted while pods in graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
DanStough committed Mar 13, 2024
1 parent 0718833 commit 4a7cfeb
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 62 deletions.
4 changes: 4 additions & 0 deletions .changelog/3736.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:bug
control-plane: fix an issue where ACL token cleanup did not respect a pod's GracefulShutdownPeriodSeconds and
tokens were invalidated immediately on pod entering Terminating state.
```
13 changes: 9 additions & 4 deletions acceptance/framework/consul/helm_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil/retry"

"github.com/hashicorp/consul-k8s/acceptance/framework/config"
"github.com/hashicorp/consul-k8s/acceptance/framework/environment"
"github.com/hashicorp/consul-k8s/acceptance/framework/helpers"
"github.com/hashicorp/consul-k8s/acceptance/framework/k8s"
"github.com/hashicorp/consul-k8s/acceptance/framework/logger"
"github.com/hashicorp/consul-k8s/acceptance/framework/portforward"
"github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil/retry"
)

// HelmCluster implements Cluster and uses Helm
Expand Down Expand Up @@ -752,6 +753,10 @@ func defaultValues() map[string]string {
// (false positive).
"dns.enabled": "false",

// Adjust the default value from 30s to 1s since we have several tests that verify tokens are cleaned up,
// and many of them are using the default retryer (7s max).
"connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "1",

// Enable trace logs for servers and clients.
"server.extraConfig": `"{\"log_level\": \"TRACE\"}"`,
"client.extraConfig": `"{\"log_level\": \"TRACE\"}"`,
Expand Down
14 changes: 9 additions & 5 deletions acceptance/framework/consul/helm_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"testing"

"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/hashicorp/consul-k8s/acceptance/framework/config"
"github.com/hashicorp/consul-k8s/acceptance/framework/environment"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/controller-runtime/pkg/client"
runtimefake "sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/hashicorp/consul-k8s/acceptance/framework/config"
"github.com/hashicorp/consul-k8s/acceptance/framework/environment"
)

// Test that if TestConfig has values that need to be provided
Expand All @@ -33,7 +34,8 @@ func TestNewHelmCluster(t *testing.T) {
"global.image": "test-config-image",
"global.logLevel": "debug",
"server.replicas": "1",
"connectInject.transparentProxy.defaultEnabled": "false",
"connectInject.transparentProxy.defaultEnabled": "false",
"connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "1",
"dns.enabled": "false",
"server.extraConfig": `"{\"log_level\": \"TRACE\"}"`,
"client.extraConfig": `"{\"log_level\": \"TRACE\"}"`,
Expand All @@ -46,7 +48,8 @@ func TestNewHelmCluster(t *testing.T) {
"global.logLevel": "debug",
"server.bootstrapExpect": "3",
"server.replicas": "3",
"connectInject.transparentProxy.defaultEnabled": "true",
"connectInject.transparentProxy.defaultEnabled": "true",
"connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "3",
"dns.enabled": "true",
"server.extraConfig": `"{\"foo\": \"bar\"}"`,
"client.extraConfig": `"{\"foo\": \"bar\"}"`,
Expand All @@ -57,7 +60,8 @@ func TestNewHelmCluster(t *testing.T) {
"global.logLevel": "debug",
"server.bootstrapExpect": "3",
"server.replicas": "3",
"connectInject.transparentProxy.defaultEnabled": "true",
"connectInject.transparentProxy.defaultEnabled": "true",
"connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "3",
"dns.enabled": "true",
"server.extraConfig": `"{\"foo\": \"bar\"}"`,
"client.extraConfig": `"{\"foo\": \"bar\"}"`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"net"
"regexp"
"strconv"
"strings"
"time"

mapset "github.com/deckarep/golang-set"
"github.com/go-logr/logr"
Expand All @@ -25,6 +27,7 @@ import (

"github.com/hashicorp/consul-k8s/control-plane/connect-inject/common"
"github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants"
"github.com/hashicorp/consul-k8s/control-plane/connect-inject/lifecycle"
"github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics"
"github.com/hashicorp/consul-k8s/control-plane/consul"
"github.com/hashicorp/consul-k8s/control-plane/helper/parsetags"
Expand Down Expand Up @@ -94,6 +97,8 @@ type Controller struct {
// any created Consul namespaces to allow cross namespace service discovery.
// Only necessary if ACLs are enabled.
CrossNSACLPolicy string
// Lifecycle config set graceful startup/shutdown defaults for pods.
LifecycleConfig lifecycle.Config
// ReleaseName is the Consul Helm installation release.
ReleaseName string
// ReleaseNamespace is the namespace where Consul is installed.
Expand Down Expand Up @@ -153,18 +158,14 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

err = r.Client.Get(ctx, req.NamespacedName, &serviceEndpoints)
// endpointPods holds a set of all pods this endpoints object is currently pointing to.
// We use this later when we reconcile ACL tokens to decide whether an ACL token in Consul
// is for a pod that no longer exists.
endpointPods := mapset.NewSet()

// If the endpoints object has been deleted (and we get an IsNotFound
// error), we need to deregister all instances in Consul for that service.
if k8serrors.IsNotFound(err) {
// Deregister all instances in Consul for this service. The function deregisterService handles
// the case where the Consul service name is different from the Kubernetes service name.
err = r.deregisterService(apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{}, err
requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{RequeueAfter: requeueAfter}, err
} else if err != nil {
r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace)
return ctrl.Result{}, err
Expand All @@ -177,8 +178,8 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if isLabeledIgnore(serviceEndpoints.Labels) {
// We always deregister the service to handle the case where a user has registered the service, then added the label later.
r.Log.Info("ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace)
err = r.deregisterService(apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{}, err
requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{RequeueAfter: requeueAfter}, err
}

// endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare
Expand Down Expand Up @@ -213,12 +214,13 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

if hasBeenInjected(pod) {
endpointPods.Add(address.TargetRef.Name)
if isConsulDataplaneSupported(pod) {
if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil {
if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus); err != nil {
r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true
} else {
r.Log.Info("detected an update to pre-consul-dataplane service", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
nodeAgentClientCfg, err := r.consulClientCfgForNodeAgent(apiClient, pod, serverState)
Expand All @@ -241,11 +243,12 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

if isGateway(pod) {
endpointPods.Add(address.TargetRef.Name)
if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil {
if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus); err != nil {
r.Log.Error(err, "failed to register gateway or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true
}
}
}
Expand All @@ -254,12 +257,13 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister
// from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during
// the registration codepath.
if err = r.deregisterService(apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil {
requeueAfter, err := r.deregisterService(ctx, apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap)
if err != nil {
r.Log.Error(err, "failed to deregister endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}

return ctrl.Result{}, errs
return ctrl.Result{RequeueAfter: requeueAfter}, errs
}

func (r *Controller) Logger(name types.NamespacedName) logr.Logger {
Expand All @@ -274,10 +278,7 @@ func (r *Controller) SetupWithManager(mgr ctrl.Manager) error {

// registerServicesAndHealthCheck creates Consul registrations for the service and proxy and registers them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true

func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error {
var managedByEndpointsController bool
if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue {
managedByEndpointsController = true
Expand Down Expand Up @@ -335,10 +336,7 @@ func parseLocality(node corev1.Node) *api.Locality {

// registerGateway creates Consul registrations for the Connect Gateways and registers them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true

func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error {
var managedByEndpointsController bool
if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue {
managedByEndpointsController = true
Expand Down Expand Up @@ -934,40 +932,54 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string)
// The argument endpointsAddressesMap decides whether to deregister *all* service instances or selectively deregister
// them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map
// has addresses, it will only deregister instances not in the map.
func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {
// If the pod backing a Consul service instance still exists and the graceful shutdown lifecycle mode is enabled, the instance
// will not be deregistered. Instead, its health check will be updated to Critical in order to drain incoming traffic and
// this function will return a requeueAfter duration. This can be used to requeue the event at the longest shutdown time
// interval to clean up these instances after they have exited.
func (r *Controller) deregisterService(
ctx context.Context,
apiClient *api.Client,
k8sSvcName string,
k8sSvcNamespace string,
endpointsAddressesMap map[string]bool) (time.Duration, error) {

// Get services matching metadata from Consul
serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName)
return err
return 0, err
}

var errs error
var requeueAfter time.Duration
for _, svc := range serviceInstances {
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister
// every service instance.
var serviceDeregistered bool
if endpointsAddressesMap != nil {
if _, ok := endpointsAddressesMap[svc.ServiceAddress]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.ServiceID,
Namespace: svc.Namespace,
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ServiceID)
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}

if addressIsMissingFromEndpointsMap(svc.ServiceAddress, endpointsAddressesMap) {
// If graceful shutdown is enabled, continue to the next service instance and
// mark that an event requeue is needed. We should requeue at the longest time interval
// to prevent excessive re-queues. Also, updating the health status in Consul to Critical
// should prevent routing during gracefulShutdown.
podShutdownDuration, err := r.getGracefulShutdownAndUpdatePodCheck(ctx, apiClient, svc, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get pod shutdown duration", "svc", svc.ServiceName)
errs = multierror.Append(errs, err)
}
} else {

// set requeue response, then continue to the next service instance
if podShutdownDuration > requeueAfter {
requeueAfter = podShutdownDuration
}
if podShutdownDuration > 0 {
continue
}

// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
_, err = apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.ServiceID,
Namespace: svc.Namespace,
Expand Down Expand Up @@ -999,8 +1011,87 @@ func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvc
}
}

return errs
if requeueAfter > 0 {
r.Log.Info("re-queueing event for graceful shutdown", "name", k8sSvcName, "k8sNamespace", k8sSvcNamespace, "requeueAfter", requeueAfter)
}

return requeueAfter, errs
}

// getGracefulShutdownAndUpdatePodCheck checks if the pod is in the process of being terminated and if so, updates the
// health status of the service to critical. It returns the duration for which the pod should be re-queued (which is the pods
// gracefulShutdownPeriod setting).
func (r *Controller) getGracefulShutdownAndUpdatePodCheck(ctx context.Context, apiClient *api.Client, svc *api.CatalogService, k8sNamespace string) (time.Duration, error) {
// Get the pod, and check if it is still running. We do this to defer ACL/node cleanup for pods that are
// in graceful termination
podName := svc.ServiceMeta[constants.MetaKeyPodName]
if podName == "" {
return 0, nil
}

var pod corev1.Pod
err := r.Client.Get(ctx, types.NamespacedName{Name: podName, Namespace: k8sNamespace}, &pod)
if k8serrors.IsNotFound(err) {
return 0, nil
}
if err != nil {
r.Log.Error(err, "failed to get terminating pod", "name", podName, "k8sNamespace", k8sNamespace)
return 0, fmt.Errorf("failed to get terminating pod %s/%s: %w", k8sNamespace, podName, err)
}

shutdownSeconds, err := r.getGracefulShutdownPeriodSecondsForPod(pod)
if err != nil {
r.Log.Error(err, "failed to get graceful shutdown period for pod", "name", pod, "k8sNamespace", k8sNamespace)
return 0, fmt.Errorf("failed to get graceful shutdown period for pod %s/%s: %w", k8sNamespace, podName, err)
}

if shutdownSeconds > 0 {
// Update the health status of the service to critical so that we can drain inbound traffic.
// We don't need to handle the proxy service since that will be reconciled looping through all the service instances.
serviceRegistration := &api.CatalogRegistration{
Node: common.ConsulNodeNameFromK8sNode(pod.Spec.NodeName),
Address: pod.Status.HostIP,
// Service is nil since we are patching the health status
Check: &api.AgentCheck{
CheckID: consulHealthCheckID(pod.Namespace, svc.ServiceID),
Name: constants.ConsulKubernetesCheckName,
Type: constants.ConsulKubernetesCheckType,
Status: api.HealthCritical,
ServiceID: svc.ServiceID,
Output: fmt.Sprintf("Pod \"%s/%s\" is terminating", pod.Namespace, podName),
Namespace: r.consulNamespace(pod.Namespace),
},
SkipNodeUpdate: true,
}

r.Log.Info("updating health status of service with Consul to critical in order to drain inbound traffic", "name", svc.ServiceName,
"id", svc.ServiceID, "pod", podName, "k8sNamespace", pod.Namespace)
_, err = apiClient.Catalog().Register(serviceRegistration, nil)
if err != nil {
r.Log.Error(err, "failed to update service health status to critical", "name", svc.ServiceName, "pod", podName)
return 0, fmt.Errorf("failed to update service health status for pod %s/%s to critical: %w", pod.Namespace, podName, err)
}

// Return the duration for which the pod should be re-queued. We add 20% to the shutdownSeconds to account for
// any potential delay in the pod killed.
return time.Duration(shutdownSeconds+int(math.Ceil(float64(shutdownSeconds)*0.2))) * time.Second, nil
}
return 0, nil
}

// getGracefulShutdownPeriodSecondsForPod returns the graceful shutdown period for the pod. If one is not specified,
// either through the controller configuration or pod annotations, it returns 0.
func (r *Controller) getGracefulShutdownPeriodSecondsForPod(pod corev1.Pod) (int, error) {
enabled, err := r.LifecycleConfig.EnableProxyLifecycle(pod)
if err != nil {
return 0, fmt.Errorf("failed to get parse proxy lifecycle configuration for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
// Check that SidecarProxyLifecycle is enabled.
if !enabled {
return 0, nil
}

return r.LifecycleConfig.ShutdownGracePeriodSeconds(pod)
}

// deregisterNode removes a node if it does not have any associated services attached to it.
Expand Down Expand Up @@ -1529,3 +1620,11 @@ func getMultiPortIdx(pod corev1.Pod, serviceEndpoints corev1.Endpoints) int {
}
return -1
}

func addressIsMissingFromEndpointsMap(address string, endpointsAddressesMap map[string]bool) bool {
if endpointsAddressesMap == nil {
return true
}
_, ok := endpointsAddressesMap[address]
return !ok
}
Loading

0 comments on commit 4a7cfeb

Please sign in to comment.