Skip to content

Commit

Permalink
fix(control-plane): acl tokens deleted while pods in graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
DanStough committed Mar 12, 2024
1 parent 70b9756 commit 49fb715
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

mapset "github.com/deckarep/golang-set"
"github.com/go-logr/logr"
Expand All @@ -25,6 +26,7 @@ import (

"github.com/hashicorp/consul-k8s/control-plane/connect-inject/common"
"github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants"
"github.com/hashicorp/consul-k8s/control-plane/connect-inject/lifecycle"
"github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics"
"github.com/hashicorp/consul-k8s/control-plane/consul"
"github.com/hashicorp/consul-k8s/control-plane/helper/parsetags"
Expand Down Expand Up @@ -94,6 +96,8 @@ type Controller struct {
// any created Consul namespaces to allow cross namespace service discovery.
// Only necessary if ACLs are enabled.
CrossNSACLPolicy string
// Lifecycle config set graceful startup/shutdown defaults for pods.
LifecycleConfig lifecycle.Config
// ReleaseName is the Consul Helm installation release.
ReleaseName string
// ReleaseNamespace is the namespace where Consul is installed.
Expand Down Expand Up @@ -153,18 +157,14 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

err = r.Client.Get(ctx, req.NamespacedName, &serviceEndpoints)
// endpointPods holds a set of all pods this endpoints object is currently pointing to.
// We use this later when we reconcile ACL tokens to decide whether an ACL token in Consul
// is for a pod that no longer exists.
endpointPods := mapset.NewSet()

// If the endpoints object has been deleted (and we get an IsNotFound
// error), we need to deregister all instances in Consul for that service.
if k8serrors.IsNotFound(err) {
// Deregister all instances in Consul for this service. The function deregisterService handles
// the case where the Consul service name is different from the Kubernetes service name.
err = r.deregisterService(apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{}, err
requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{RequeueAfter: requeueAfter}, err
} else if err != nil {
r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace)
return ctrl.Result{}, err
Expand All @@ -177,8 +177,8 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if isLabeledIgnore(serviceEndpoints.Labels) {
// We always deregister the service to handle the case where a user has registered the service, then added the label later.
r.Log.Info("ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace)
err = r.deregisterService(apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{}, err
requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{RequeueAfter: requeueAfter}, err
}

// endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare
Expand Down Expand Up @@ -213,9 +213,11 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

if hasBeenInjected(pod) {
endpointPods.Add(address.TargetRef.Name)
if isConsulDataplaneSupported(pod) {
if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true

if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus); err != nil {
r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
Expand All @@ -241,8 +243,10 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

if isGateway(pod) {
endpointPods.Add(address.TargetRef.Name)
if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true

if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus); err != nil {
r.Log.Error(err, "failed to register gateway or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
Expand All @@ -254,12 +258,13 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister
// from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during
// the registration codepath.
if err = r.deregisterService(apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil {
requeueAfter, err := r.deregisterService(ctx, apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap)
if err != nil {
r.Log.Error(err, "failed to deregister endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}

return ctrl.Result{}, errs
return ctrl.Result{RequeueAfter: requeueAfter}, errs
}

func (r *Controller) Logger(name types.NamespacedName) logr.Logger {
Expand All @@ -274,10 +279,7 @@ func (r *Controller) SetupWithManager(mgr ctrl.Manager) error {

// registerServicesAndHealthCheck creates Consul registrations for the service and proxy and registers them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true

func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error {
var managedByEndpointsController bool
if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue {
managedByEndpointsController = true
Expand Down Expand Up @@ -335,10 +337,7 @@ func parseLocality(node corev1.Node) *api.Locality {

// registerGateway creates Consul registrations for the Connect Gateways and registers them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true

func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error {
var managedByEndpointsController bool
if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue {
managedByEndpointsController = true
Expand Down Expand Up @@ -934,40 +933,50 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string)
// The argument endpointsAddressesMap decides whether to deregister *all* service instances or selectively deregister
// them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map
// has addresses, it will only deregister instances not in the map.
func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {
func (r *Controller) deregisterService(
ctx context.Context,
apiClient *api.Client,
k8sSvcName string,
k8sSvcNamespace string,
endpointsAddressesMap map[string]bool) (time.Duration, error) {

// Get services matching metadata from Consul
serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName)
return err
return 0, err
}

var errs error
var requeueAfter time.Duration
for _, svc := range serviceInstances {
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister
// every service instance.
var serviceDeregistered bool
if endpointsAddressesMap != nil {
if _, ok := endpointsAddressesMap[svc.ServiceAddress]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.ServiceID,
Namespace: svc.Namespace,
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ServiceID)
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}

if addressIsMissingFromEndpointsMap(svc.ServiceAddress, endpointsAddressesMap) {
// If graceful shutdown is enabled, continue to the next service instance and
// mark that an event requeue is needed. We should requeue at the longest time interval
// to prevent excessive re-queues. Also, updating the health status in Consul to Critical
// should prevent routing during gracefulShutdown.
podShutdownDuration, err := r.getGracefulShutdownAndUpdatePodCheck(ctx, apiClient, svc, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get pod shutdown duration", "svc", svc.ServiceName)
errs = multierror.Append(errs, err)
}
} else {

// set requeue response, then continue to the next service instance
if podShutdownDuration > requeueAfter {
requeueAfter = podShutdownDuration
}
if podShutdownDuration > 0 {
continue
}

// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
_, err = apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.ServiceID,
Namespace: svc.Namespace,
Expand Down Expand Up @@ -999,8 +1008,87 @@ func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvc
}
}

return errs
if requeueAfter > 0 {
r.Log.Info("re-queueing event", "requeueAfter", requeueAfter)
}

return requeueAfter, errs
}

// getGracefulShutdownAndUpdatePodCheck checks if the pod is in the process of being terminated and if so, updates the
// health status of the service to critical. It returns the duration for which the pod should be re-queued (which is the pods
// gracefulShutdownPeriod setting).
func (r *Controller) getGracefulShutdownAndUpdatePodCheck(ctx context.Context, apiClient *api.Client, svc *api.CatalogService, k8sNamespace string) (time.Duration, error) {
// Get the pod, and check if it is still running. We do this to defer ACL/node cleanup for pods that are
// in graceful termination
podName := svc.ServiceMeta[constants.MetaKeyPodName]
if podName == "" {
return 0, nil
}

var pod corev1.Pod
err := r.Client.Get(ctx, types.NamespacedName{Name: podName, Namespace: k8sNamespace}, &pod)
if err != nil && !k8serrors.IsNotFound(err) {
r.Log.Error(err, "failed to get terminating pod", "name", podName, "k8sNamespace", k8sNamespace)
return 0, fmt.Errorf("failed to get terminating pod %s/%s: %w", k8sNamespace, podName, err)
}
if k8serrors.IsNotFound(err) {
return 0, nil
}

shutdownSeconds, err := r.getGracefulShutdownPeriodSecondsForPod(pod)
if err != nil {
r.Log.Error(err, "failed to get graceful shutdown period for pod", "name", pod, "k8sNamespace", k8sNamespace)
return 0, fmt.Errorf("failed to get graceful shutdown period for pod %s/%s: %w", k8sNamespace, podName, err)
}

if shutdownSeconds > 0 {
// Update the health status of the service to critical so that we can drain inbound traffic.
// We don't need to handle the proxy service since that will be reconciled looping through all the service instances.
serviceRegistration := &api.CatalogRegistration{
Node: common.ConsulNodeNameFromK8sNode(pod.Spec.NodeName),
Address: pod.Status.HostIP,
// Service is nil since we are patching the health status
Check: &api.AgentCheck{
CheckID: consulHealthCheckID(pod.Namespace, svc.ServiceID),
Name: constants.ConsulKubernetesCheckName,
Type: constants.ConsulKubernetesCheckType,
Status: api.HealthCritical,
ServiceID: svc.ServiceID,
Output: fmt.Sprintf("Pod \"%s/%s\" is terminating", pod.Namespace, podName),
Namespace: r.consulNamespace(pod.Namespace),
},
SkipNodeUpdate: true,
}

r.Log.Info("updating health status of service with Consul", "name", svc.ServiceName,
"id", svc.ServiceID, "pod", podName, "k8sNamespace", pod.Namespace)
_, err = apiClient.Catalog().Register(serviceRegistration, nil)
if err != nil {
r.Log.Error(err, "failed to update service health status", "name", svc.ServiceName, "pod", podName)
return 0, fmt.Errorf("failed to update service health status for pod %s/%s: %w", pod.Namespace, podName, err)
}

// Return the duration for which the pod should be re-queued. We add 20% to the shutdownSeconds to account for
// any potential delay in the pod killed.
return time.Duration(shutdownSeconds+int(float64(shutdownSeconds)*0.2)) * time.Second, nil
}
return 0, nil
}

// getGracefulShutdownPeriodSecondsForPod returns the graceful shutdown period for the pod. If one is not specified,
// either through the controller configuration or pod annotations, it returns 0.
func (r *Controller) getGracefulShutdownPeriodSecondsForPod(pod corev1.Pod) (int, error) {
enabled, err := r.LifecycleConfig.EnableProxyLifecycle(pod)
if err != nil {
return 0, fmt.Errorf("failed to get parse proxy lifecycle configuration for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
// Check that SidecarProxyLifecycle is enabled.
if !enabled {
return 0, nil
}

return r.LifecycleConfig.ShutdownGracePeriodSeconds(pod)
}

// deregisterNode removes a node if it does not have any associated services attached to it.
Expand Down Expand Up @@ -1529,3 +1617,11 @@ func getMultiPortIdx(pod corev1.Pod, serviceEndpoints corev1.Endpoints) int {
}
return -1
}

func addressIsMissingFromEndpointsMap(address string, endpointsAddressesMap map[string]bool) bool {
if endpointsAddressesMap == nil {
return true
}
_, ok := endpointsAddressesMap[address]
return !ok
}
Loading

0 comments on commit 49fb715

Please sign in to comment.