Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watcher for agent pods to endpoints controller #457

Merged
merged 1 commit into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 97 additions & 1 deletion connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@ import (
"github.com/hashicorp/consul/api"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

type EndpointsController struct {
Expand Down Expand Up @@ -387,7 +393,11 @@ func (r *EndpointsController) Logger(name types.NamespacedName) logr.Logger {
func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Endpoints{}).
Complete(r)
Watches(
&source.Kind{Type: &corev1.Pod{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.requestsForRunningAgentPods)},
builder.WithPredicates(predicate.NewPredicateFuncs(r.filterAgentPods)),
).Complete(r)
Comment on lines +396 to +400
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woah this is fancy! Cool!

}

// getConsulClient returns an *api.Client that points at the consul agent local to the pod.
Expand Down Expand Up @@ -421,6 +431,92 @@ func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool {
return false
}

// filterAgentPods receives meta and object information for Kubernetes resources that are being watched,
// which in this case are Pods. It only returns true if the Pod is a Consul Client Agent Pod. It reads the labels
// from the meta of the resource and uses the values of the "app" and "component" label to validate that
// the Pod is a Consul Client Agent.
func (r EndpointsController) filterAgentPods(meta metav1.Object, object runtime.Object) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thisisnotashwin These should be function pointers I think: r *EndpointsController

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the late response. They don't seem to use any values from the r struct. this might be something I dont quite understand here but why would it need to be a function pointer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just because the other functions in this struct do use the struct pointer so these should be consistent. Or if they're not accessing anything on the struct they could be functions instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is a good point. i looked at the other method and i think it does make sense to use pointers to keep them consistent. plus i could add some logging here just for some added information in which case i can use the logger from the struct! good catch!

podLabels := meta.GetLabels()
app, ok := podLabels["app"]
if !ok {
return false
}
component, ok := podLabels["component"]
if !ok {
return false
}

release, ok := podLabels["release"]
if !ok {
return false
}

if app == "consul" && component == "client" && release == r.ReleaseName {
return true
}
return false
}

// requestsForRunningAgentPods creates a slice of requests for the endpoints controller.
// It enqueues a request for each endpoint that needs to be reconciled. It iterates through
// the list of endpoints and creates a request for those endpoints that have an address that
// are on the same node as the new Consul Agent pod. It receives a Pod Object which is a
// Consul Agent that has been filtered by filterAgentPods and only enqueues endpoints
// for client agent pods where the Ready condition is true.
func (r EndpointsController) requestsForRunningAgentPods(object handler.MapObject) []ctrl.Request {
var consulClientPod corev1.Pod
r.Log.Info("received update for consulClientPod", "podName", object.Meta.GetName())
err := r.Client.Get(r.Ctx, types.NamespacedName{Name: object.Meta.GetName(), Namespace: object.Meta.GetNamespace()}, &consulClientPod)
if k8serrors.IsNotFound(err) {
// Ignore if consulClientPod is not found.
return []ctrl.Request{}
}
if err != nil {
r.Log.Error(err, "failed to get consulClientPod", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not running, since
// we can't reconcile and register/deregister services against that agent.
if consulClientPod.Status.Phase != corev1.PodRunning {
r.Log.Info("ignoring consulClientPod because it's not running", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not yet ready, since
// we can't reconcile and register/deregister services against that agent.
for _, cond := range consulClientPod.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status != corev1.ConditionTrue {
// Ignore if consulClientPod is not ready.
r.Log.Info("ignoring consulClientPod because it's not ready", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
}

// Get the list of all endpoints.
var endpointsList corev1.EndpointsList
err = r.Client.List(r.Ctx, &endpointsList)
if err != nil {
r.Log.Error(err, "failed to list endpoints")
return []ctrl.Request{}
}

// Enqueue requests for endpoints that are on the same node
// as the client agent.
var requests []reconcile.Request
for _, ep := range endpointsList.Items {
for _, subset := range ep.Subsets {
allAddresses := subset.Addresses
allAddresses = append(allAddresses, subset.NotReadyAddresses...)
for _, address := range allAddresses {
// Only add requests for the address that is on the same node as the consul client pod.
if address.NodeName != nil && *address.NodeName == consulClientPod.Spec.NodeName {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Name: ep.Name, Namespace: ep.Namespace}})
}
}
}
}
return requests
}

// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected.
func hasBeenInjected(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[annotationStatus]; ok {
Expand Down
Loading