Skip to content

Commit

Permalink
Add watcher for agent pods to endpoints controller
Browse files Browse the repository at this point in the history
- This watcher watches for Consul Agent pods to be in a running phase
  and the condition ready to be true and then reconcile all endpoints
that have a ready/not-ready address that share a node name with that of
the consul agent pod.
  • Loading branch information
Ashwin Venkatesh committed Mar 23, 2021
1 parent c2c32fb commit b75098e
Show file tree
Hide file tree
Showing 2 changed files with 710 additions and 15 deletions.
98 changes: 97 additions & 1 deletion connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@ import (
"github.com/hashicorp/consul/api"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

type EndpointsController struct {
Expand Down Expand Up @@ -387,7 +393,11 @@ func (r *EndpointsController) Logger(name types.NamespacedName) logr.Logger {
func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Endpoints{}).
Complete(r)
Watches(
&source.Kind{Type: &corev1.Pod{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.requestsForRunningAgentPods)},
builder.WithPredicates(predicate.NewPredicateFuncs(r.filterAgentPods)),
).Complete(r)
}

// getConsulClient returns an *api.Client that points at the consul agent local to the pod.
Expand Down Expand Up @@ -421,6 +431,92 @@ func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool {
return false
}

// filterAgentPods receives meta and object information for Kubernetes resources that are being watched,
// which in this case are Pods. It only returns true if the Pod is a Consul Client Agent Pod. It reads the labels
// from the meta of the resource and uses the values of the "app" and "component" label to validate that
// the Pod is a Consul Client Agent.
func (r EndpointsController) filterAgentPods(meta metav1.Object, object runtime.Object) bool {
podLabels := meta.GetLabels()
app, ok := podLabels["app"]
if !ok {
return false
}
component, ok := podLabels["component"]
if !ok {
return false
}

release, ok := podLabels["release"]
if !ok {
return false
}

if app == "consul" && component == "client" && release == r.ReleaseName {
return true
}
return false
}

// requestsForRunningAgentPods creates a slice of requests for the endpoints controller.
// It enqueues a request for each endpoint that needs to be reconciled. It iterates through
// the list of endpoints and creates a request for those endpoints that have an address that
// are on the same node as the new Consul Agent pod. It receives a Pod Object which is a
// Consul Agent that has been filtered by filterAgentPods and only enqueues endpoints
// for client agent pods where the Ready condition is true.
func (r EndpointsController) requestsForRunningAgentPods(object handler.MapObject) []ctrl.Request {
var consulClientPod corev1.Pod
r.Log.Info("received update for consulClientPod", "podName", object.Meta.GetName())
err := r.Client.Get(r.Ctx, types.NamespacedName{Name: object.Meta.GetName(), Namespace: object.Meta.GetNamespace()}, &consulClientPod)
if k8serrors.IsNotFound(err) {
// Ignore if consulClientPod is not found.
return []ctrl.Request{}
}
if err != nil {
r.Log.Error(err, "failed to get consulClientPod", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not running, since
// we can't reconcile and register/deregister services against that agent.
if consulClientPod.Status.Phase != corev1.PodRunning {
r.Log.Info("ignoring consulClientPod because it's not running", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not yet ready, since
// we can't reconcile and register/deregister services against that agent.
for _, cond := range consulClientPod.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status != corev1.ConditionTrue {
// Ignore if consulClientPod is not ready.
r.Log.Info("ignoring consulClientPod because it's not ready", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
}

// Get the list of all endpoints.
var endpointsList corev1.EndpointsList
err = r.Client.List(r.Ctx, &endpointsList)
if err != nil {
r.Log.Error(err, "failed to list endpoints")
return []ctrl.Request{}
}

// Enqueue requests for endpoints that are on the same node
// as the client agent.
var requests []reconcile.Request
for _, ep := range endpointsList.Items {
for _, subset := range ep.Subsets {
allAddresses := subset.Addresses
allAddresses = append(allAddresses, subset.NotReadyAddresses...)
for _, address := range allAddresses {
// Only add requests for the address that is on the same node as the consul client pod.
if address.NodeName != nil && *address.NodeName == consulClientPod.Spec.NodeName {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Name: ep.Name, Namespace: ep.Namespace}})
}
}
}
}
return requests
}

// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected.
func hasBeenInjected(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[annotationStatus]; ok {
Expand Down
Loading

0 comments on commit b75098e

Please sign in to comment.