diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index 3e4b28ea20..fdfa7d3fd1 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -11,11 +11,17 @@ import ( "github.com/hashicorp/consul/api" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) type EndpointsController struct { @@ -387,7 +393,11 @@ func (r *EndpointsController) Logger(name types.NamespacedName) logr.Logger { func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&corev1.Endpoints{}). - Complete(r) + Watches( + &source.Kind{Type: &corev1.Pod{}}, + &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.requestsForRunningAgentPods)}, + builder.WithPredicates(predicate.NewPredicateFuncs(r.filterAgentPods)), + ).Complete(r) } // getConsulClient returns an *api.Client that points at the consul agent local to the pod. @@ -421,6 +431,92 @@ func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool { return false } +// filterAgentPods receives meta and object information for Kubernetes resources that are being watched, +// which in this case are Pods. It only returns true if the Pod is a Consul Client Agent Pod. It reads the labels +// from the meta of the resource and uses the values of the "app" and "component" label to validate that +// the Pod is a Consul Client Agent. +func (r EndpointsController) filterAgentPods(meta metav1.Object, object runtime.Object) bool { + podLabels := meta.GetLabels() + app, ok := podLabels["app"] + if !ok { + return false + } + component, ok := podLabels["component"] + if !ok { + return false + } + + release, ok := podLabels["release"] + if !ok { + return false + } + + if app == "consul" && component == "client" && release == r.ReleaseName { + return true + } + return false +} + +// requestsForRunningAgentPods creates a slice of requests for the endpoints controller. +// It enqueues a request for each endpoint that needs to be reconciled. It iterates through +// the list of endpoints and creates a request for those endpoints that have an address that +// are on the same node as the new Consul Agent pod. It receives a Pod Object which is a +// Consul Agent that has been filtered by filterAgentPods and only enqueues endpoints +// for client agent pods where the Ready condition is true. +func (r EndpointsController) requestsForRunningAgentPods(object handler.MapObject) []ctrl.Request { + var consulClientPod corev1.Pod + r.Log.Info("received update for consulClientPod", "podName", object.Meta.GetName()) + err := r.Client.Get(r.Ctx, types.NamespacedName{Name: object.Meta.GetName(), Namespace: object.Meta.GetNamespace()}, &consulClientPod) + if k8serrors.IsNotFound(err) { + // Ignore if consulClientPod is not found. + return []ctrl.Request{} + } + if err != nil { + r.Log.Error(err, "failed to get consulClientPod", "consulClientPod", consulClientPod.Name) + return []ctrl.Request{} + } + // We can ignore the agent pod if it's not running, since + // we can't reconcile and register/deregister services against that agent. + if consulClientPod.Status.Phase != corev1.PodRunning { + r.Log.Info("ignoring consulClientPod because it's not running", "consulClientPod", consulClientPod.Name) + return []ctrl.Request{} + } + // We can ignore the agent pod if it's not yet ready, since + // we can't reconcile and register/deregister services against that agent. + for _, cond := range consulClientPod.Status.Conditions { + if cond.Type == corev1.PodReady && cond.Status != corev1.ConditionTrue { + // Ignore if consulClientPod is not ready. + r.Log.Info("ignoring consulClientPod because it's not ready", "consulClientPod", consulClientPod.Name) + return []ctrl.Request{} + } + } + + // Get the list of all endpoints. + var endpointsList corev1.EndpointsList + err = r.Client.List(r.Ctx, &endpointsList) + if err != nil { + r.Log.Error(err, "failed to list endpoints") + return []ctrl.Request{} + } + + // Enqueue requests for endpoints that are on the same node + // as the client agent. + var requests []reconcile.Request + for _, ep := range endpointsList.Items { + for _, subset := range ep.Subsets { + allAddresses := subset.Addresses + allAddresses = append(allAddresses, subset.NotReadyAddresses...) + for _, address := range allAddresses { + // Only add requests for the address that is on the same node as the consul client pod. + if address.NodeName != nil && *address.NodeName == consulClientPod.Spec.NodeName { + requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Name: ep.Name, Namespace: ep.Namespace}}) + } + } + } + } + return requests +} + // hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected. func hasBeenInjected(pod corev1.Pod) bool { if anno, ok := pod.Annotations[annotationStatus]; ok { diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index ca319fbd57..68a48c5f11 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -1,6 +1,7 @@ package connectinject import ( + "context" "fmt" "strings" "testing" @@ -16,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/handler" ) func TestShouldIgnore(t *testing.T) { @@ -344,9 +346,9 @@ func TestReconcileCreateEndpoint(t *testing.T) { Namespace: "default", }, Subsets: []corev1.EndpointSubset{ - corev1.EndpointSubset{ + { Addresses: []corev1.EndpointAddress{ - corev1.EndpointAddress{ + { IP: "1.2.3.4", NodeName: &nodeName, TargetRef: &corev1.ObjectReference{ @@ -355,7 +357,7 @@ func TestReconcileCreateEndpoint(t *testing.T) { Namespace: "default", }, }, - corev1.EndpointAddress{ + { IP: "2.2.3.4", NodeName: &nodeName, TargetRef: &corev1.ObjectReference{ @@ -620,9 +622,9 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Namespace: "default", }, Subsets: []corev1.EndpointSubset{ - corev1.EndpointSubset{ + { Addresses: []corev1.EndpointAddress{ - corev1.EndpointAddress{ + { IP: "4.4.4.4", NodeName: &nodeName, TargetRef: &corev1.ObjectReference{ @@ -682,9 +684,9 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Namespace: "default", }, Subsets: []corev1.EndpointSubset{ - corev1.EndpointSubset{ + { Addresses: []corev1.EndpointAddress{ - corev1.EndpointAddress{ + { IP: "4.4.4.4", NodeName: &nodeName, TargetRef: &corev1.ObjectReference{ @@ -744,9 +746,9 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Namespace: "default", }, Subsets: []corev1.EndpointSubset{ - corev1.EndpointSubset{ + { Addresses: []corev1.EndpointAddress{ - corev1.EndpointAddress{ + { IP: "1.2.3.4", NodeName: &nodeName, TargetRef: &corev1.ObjectReference{ @@ -755,7 +757,7 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Namespace: "default", }, }, - corev1.EndpointAddress{ + { IP: "2.2.3.4", NodeName: &nodeName, TargetRef: &corev1.ObjectReference{ @@ -822,9 +824,9 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Namespace: "default", }, Subsets: []corev1.EndpointSubset{ - corev1.EndpointSubset{ + { Addresses: []corev1.EndpointAddress{ - corev1.EndpointAddress{ + { IP: "1.2.3.4", NodeName: &nodeName, TargetRef: &corev1.ObjectReference{ @@ -905,9 +907,9 @@ func TestReconcileUpdateEndpoint(t *testing.T) { Namespace: "default", }, Subsets: []corev1.EndpointSubset{ - corev1.EndpointSubset{ + { Addresses: []corev1.EndpointAddress{ - corev1.EndpointAddress{ + { IP: "1.2.3.4", NodeName: &nodeName, TargetRef: &corev1.ObjectReference{ @@ -1300,6 +1302,599 @@ func TestReconcileDeleteEndpoint(t *testing.T) { } } +func TestFilterAgentPods(t *testing.T) { + t.Parallel() + cases := map[string]struct { + meta metav1.Object + expected bool + }{ + "label[app]=consul label[component]=client label[release] consul": { + meta: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "consul", + "component": "client", + "release": "consul", + }, + }, + }, + expected: true, + }, + "no labels": { + meta: &corev1.Pod{}, + expected: false, + }, + "label[app] empty": { + meta: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "component": "client", + "release": "consul", + }, + }, + }, + expected: false, + }, + "label[component] empty": { + meta: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "consul", + "release": "consul", + }, + }, + }, + expected: false, + }, + "label[release] empty": { + meta: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "consul", + "component": "client", + }, + }, + }, + expected: false, + }, + "label[app]!=consul label[component]=client label[release]=consul": { + meta: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "not-consul", + "component": "client", + "release": "consul", + }, + }, + }, + expected: false, + }, + "label[component]!=client label[app]=consul label[release]=consul": { + meta: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "consul", + "component": "not-client", + "release": "consul", + }, + }, + }, + expected: false, + }, + "label[release]!=consul label[app]=consul label[component]=client": { + meta: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "consul", + "component": "client", + "release": "not-consul", + }, + }, + }, + expected: false, + }, + "label[app]!=consul label[component]!=client label[release]!=consul": { + meta: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "not-consul", + "component": "not-client", + "release": "not-consul", + }, + }, + }, + expected: false, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + controller := EndpointsController{ + ReleaseName: "consul", + } + + result := controller.filterAgentPods(test.meta, nil) + require.Equal(t, test.expected, result) + }) + } +} + +func TestRequestsForRunningAgentPods(t *testing.T) { + t.Parallel() + cases := map[string]struct { + agentPod *corev1.Pod + existingEndpoints []*corev1.Endpoints + expectedRequests []ctrl.Request + }{ + "pod=running, all endpoints need to be reconciled": { + agentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "consul-agent", + }, + Spec: corev1.PodSpec{ + NodeName: "node-foo", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + Phase: corev1.PodRunning, + }, + }, + existingEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-1", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-foo"), + }, + }, + NotReadyAddresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-bar"), + }, + }, + }, + }, + }, + }, + expectedRequests: []ctrl.Request{ + { + NamespacedName: types.NamespacedName{ + Name: "endpoint-1", + }, + }, + }, + }, + "pod=running, endpoints with ready address need to be reconciled": { + agentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "consul-agent", + }, + Spec: corev1.PodSpec{ + NodeName: "node-foo", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + Phase: corev1.PodRunning, + }, + }, + existingEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-1", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-foo"), + }, + }, + }, + }, + }, + }, + expectedRequests: []ctrl.Request{ + { + NamespacedName: types.NamespacedName{ + Name: "endpoint-1", + }, + }, + }, + }, + "pod=running, endpoints with not-ready address need to be reconciled": { + agentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "consul-agent", + }, + Spec: corev1.PodSpec{ + NodeName: "node-foo", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + Phase: corev1.PodRunning, + }, + }, + existingEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-1", + }, + Subsets: []corev1.EndpointSubset{ + { + NotReadyAddresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-foo"), + }, + }, + }, + }, + }, + }, + expectedRequests: []ctrl.Request{ + { + NamespacedName: types.NamespacedName{ + Name: "endpoint-1", + }, + }, + }, + }, + "pod=running, some endpoints need to be reconciled": { + agentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "consul-agent", + }, + Spec: corev1.PodSpec{ + NodeName: "node-foo", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + Phase: corev1.PodRunning, + }, + }, + existingEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-1", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-foo"), + }, + }, + NotReadyAddresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-bar"), + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-2", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-other"), + }, + }, + NotReadyAddresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-baz"), + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-3", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-foo"), + }, + }, + NotReadyAddresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-baz"), + }, + }, + }, + }, + }, + }, + expectedRequests: []ctrl.Request{ + { + NamespacedName: types.NamespacedName{ + Name: "endpoint-1", + }, + }, + { + NamespacedName: types.NamespacedName{ + Name: "endpoint-3", + }, + }, + }, + }, + "pod=running, no endpoints need to be reconciled": { + agentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "consul-agent", + }, + Spec: corev1.PodSpec{ + NodeName: "node-foo", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + Phase: corev1.PodRunning, + }, + }, + existingEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-1", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-baz"), + }, + }, + NotReadyAddresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-bar"), + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-2", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-bar"), + }, + }, + NotReadyAddresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-baz"), + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-3", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-bar"), + }, + }, + NotReadyAddresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-baz"), + }, + }, + }, + }, + }, + }, + expectedRequests: []ctrl.Request{}, + }, + "pod not ready, no endpoints need to be reconciled": { + agentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "consul-agent", + }, + Spec: corev1.PodSpec{ + NodeName: "node-foo", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + Phase: corev1.PodRunning, + }, + }, + existingEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-1", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-foo"), + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-3", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-foo"), + }, + }, + }, + }, + }, + }, + expectedRequests: []ctrl.Request{}, + }, + "pod not running, no endpoints need to be reconciled": { + agentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "consul-agent", + }, + Spec: corev1.PodSpec{ + NodeName: "node-foo", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + Phase: corev1.PodUnknown, + }, + }, + existingEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-1", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-foo"), + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-3", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-foo"), + }, + }, + }, + }, + }, + }, + expectedRequests: []ctrl.Request{}, + }, + "pod is deleted, no endpoints need to be reconciled": { + agentPod: nil, + existingEndpoints: []*corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-1", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-foo"), + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "endpoint-3", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + NodeName: toStringPtr("node-foo"), + }, + }, + }, + }, + }, + }, + expectedRequests: []ctrl.Request{}, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + logger := logrtest.TestLogger{T: t} + s := runtime.NewScheme() + s.AddKnownTypes(corev1.SchemeGroupVersion, &corev1.Pod{}, &corev1.Endpoints{}, &corev1.EndpointsList{}) + var objects []runtime.Object + if test.agentPod != nil { + objects = append(objects, test.agentPod) + } + for _, endpoint := range test.existingEndpoints { + objects = append(objects, endpoint) + } + + fakeClient := fake.NewFakeClientWithScheme(s, objects...) + + controller := &EndpointsController{ + Client: fakeClient, + Ctx: ctx, + Scheme: s, + Log: logger, + } + var requests []ctrl.Request + if test.agentPod != nil { + requests = controller.requestsForRunningAgentPods(handler.MapObject{Meta: test.agentPod}) + } else { + requests = controller.requestsForRunningAgentPods(handler.MapObject{Meta: minimal()}) + } + require.ElementsMatch(t, requests, test.expectedRequests) + }) + } +} + func createPod(name, ip string, inject bool) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1320,3 +1915,7 @@ func createPod(name, ip string, inject bool) *corev1.Pod { return pod } + +func toStringPtr(input string) *string { + return &input +}