From ff2dd5e8de57457be479b59c292caf7cb6e1a545 Mon Sep 17 00:00:00 2001 From: Ashwin Venkatesh Date: Wed, 15 Jun 2022 13:25:21 -0400 Subject: [PATCH] Add watchers for Peering Token secrets for the peering controllers - When a Kubernetes secret that has a label indicating it is a peering token secret, the controllers watch those secrets and updated to those secrets re-enqueues the resource that is associated with that peering token secret. The status is used to determine the Peering Acceptor that is re-enqueued while the spec is used to determing the Peering Dialer that gets re-enqueued. This is because the acceptor is responsible for creating the secret and hence metaphorically owns the secret described in it's status. OTOH the dialer should respond to changes in the secret described in it's spec. This is only supported for secrets with a Kubernetes backend. --- control-plane/connect-inject/annotations.go | 4 + .../peering_acceptor_controller.go | 55 ++++- .../peering_acceptor_controller_test.go | 218 ++++++++++++++++++ .../peering_dialer_controller.go | 42 +++- .../peering_dialer_controller_test.go | 216 +++++++++++++++++ 5 files changed, 533 insertions(+), 2 deletions(-) diff --git a/control-plane/connect-inject/annotations.go b/control-plane/connect-inject/annotations.go index 1139642a49..389f83f8a5 100644 --- a/control-plane/connect-inject/annotations.go +++ b/control-plane/connect-inject/annotations.go @@ -143,6 +143,10 @@ const ( // registered with Consul. labelServiceIgnore = "consul.hashicorp.com/service-ignore" + // labelPeeringToken is a label that can be added to a secret to allow it to be watched + // by the peering controllers. + labelPeeringToken = "consul.hashicorp.com/peering-token" + // injected is used as the annotation value for annotationInjected. injected = "injected" diff --git a/control-plane/connect-inject/peering_acceptor_controller.go b/control-plane/connect-inject/peering_acceptor_controller.go index ca14b2f89c..cdef646fbb 100644 --- a/control-plane/connect-inject/peering_acceptor_controller.go +++ b/control-plane/connect-inject/peering_acceptor_controller.go @@ -14,8 +14,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) // PeeringAcceptorController reconciles a PeeringAcceptor object. @@ -318,7 +323,11 @@ func (r *PeeringAcceptorController) deleteK8sSecret(ctx context.Context, accepto func (r *PeeringAcceptorController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&consulv1alpha1.PeeringAcceptor{}). - Complete(r) + Watches( + &source.Kind{Type: &corev1.Secret{}}, + handler.EnqueueRequestsFromMapFunc(r.requestsForPeeringTokens), + builder.WithPredicates(predicate.NewPredicateFuncs(r.filterPeeringAcceptors)), + ).Complete(r) } // generateToken is a helper function that calls the Consul api to generate a token for the peer. @@ -344,12 +353,56 @@ func (r *PeeringAcceptorController) deletePeering(ctx context.Context, peerName return nil } +// requestsForPeeringTokens creates a slice of requests for the peering acceptor controller. +// It enqueues a request for each acceptor that needs to be reconciled. It iterates through +// the list of endpoints and creates a request for those endpoints that have an address that +// are on the same node as the new Consul Agent pod. It receives a Pod Object which is a +// Consul Agent that has been filtered by filterAgentPods and only enqueues endpoints +// for client agent pods where the Ready condition is true. +func (r *PeeringAcceptorController) requestsForPeeringTokens(object client.Object) []reconcile.Request { + r.Log.Info("received update for Peering Token Secret", "name", object.GetName()) + + // Get the list of all acceptors. + var acceptorList consulv1alpha1.PeeringAcceptorList + if err := r.Client.List(r.Context, &acceptorList); err != nil { + r.Log.Error(err, "failed to list endpoints") + return []ctrl.Request{} + } + for _, acceptor := range acceptorList.Items { + if acceptor.SecretRef().Backend == "kubernetes" { + if acceptor.SecretRef().Name == object.GetName() && acceptor.Namespace == object.GetNamespace() { + return []ctrl.Request{{NamespacedName: types.NamespacedName{Namespace: acceptor.Namespace, Name: acceptor.Name}}} + } + } + } + return []ctrl.Request{} +} + +// filterPeeringAcceptors receives meta and object information for Kubernetes resources that are being watched, +// which in this case are Secrets. It only returns true if the Secret is a Peering Token Secret. It reads the labels +// from the meta of the resource and uses the values of the "consul.hashicorp.com/peering-token" label to validate that +// the Secret is a Peering Token Secret. +func (r *PeeringAcceptorController) filterPeeringAcceptors(object client.Object) bool { + secretLabels := object.GetLabels() + isPeeringToken, ok := secretLabels[labelPeeringToken] + if !ok { + return false + } + if isPeeringToken == "true" { + return true + } + return false +} + // createSecret is a helper function that creates a corev1.Secret when provided inputs. func createSecret(name, namespace, key, value string) *corev1.Secret { secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + Labels: map[string]string{ + labelPeeringToken: "true", + }, }, Data: map[string][]byte{ key: []byte(value), diff --git a/control-plane/connect-inject/peering_acceptor_controller_test.go b/control-plane/connect-inject/peering_acceptor_controller_test.go index 65ccfba0ad..1861c96e48 100644 --- a/control-plane/connect-inject/peering_acceptor_controller_test.go +++ b/control-plane/connect-inject/peering_acceptor_controller_test.go @@ -21,6 +21,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // TestReconcileCreateUpdatePeeringAcceptor creates a peering acceptor. @@ -396,6 +397,8 @@ func TestReconcileCreateUpdatePeeringAcceptor(t *testing.T) { require.NoError(t, err) expSecrets := tt.expectedK8sSecrets() require.Equal(t, expSecrets[0].Name, createdSecret.Name) + require.Contains(t, createdSecret.Labels, labelPeeringToken) + require.Equal(t, createdSecret.Labels[labelPeeringToken], "true") // This assertion needs to be on StringData rather than Data because in the fake K8s client the contents are // stored in StringData if that's how the secret was initialized in the fake client. In a real cluster, this // StringData is an input only field, and shouldn't be read from. @@ -943,3 +946,218 @@ func TestAcceptorUpdateStatusError(t *testing.T) { }) } } + +func TestAcceptor_FilterPeeringAcceptor(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + result bool + }{ + "returns true if label is set to true": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "true", + }, + }, + }, + result: true, + }, + "returns false if label is set to false": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "false", + }, + }, + }, + result: false, + }, + "returns false if label is set to a non-true value": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "foo", + }, + }, + }, + result: false, + }, + "returns false if label is not set": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + result: false, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + controller := PeeringAcceptorController{} + result := controller.filterPeeringAcceptors(tt.secret) + require.Equal(t, tt.result, result) + }) + } +} + +func TestAcceptor_RequestsForPeeringTokens(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + acceptors v1alpha1.PeeringAcceptorList + result []reconcile.Request + }{ + "secret matches existing acceptor": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + acceptors: v1alpha1.PeeringAcceptorList{ + Items: []v1alpha1.PeeringAcceptor{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering", + }, + }, + }, + }, + "does not match if backend is not kubernetes": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + acceptors: v1alpha1.PeeringAcceptorList{ + Items: []v1alpha1.PeeringAcceptor{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "vault", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{}, + }, + "only matches with the correct acceptor": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + acceptors: v1alpha1.PeeringAcceptorList{ + Items: []v1alpha1.PeeringAcceptor{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-1", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-2", + Namespace: "test-2", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-3", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test-2", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering-1", + }, + }, + }, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + s := scheme.Scheme + s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.PeeringAcceptor{}, &v1alpha1.PeeringAcceptorList{}) + fakeClient := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(tt.secret, &tt.acceptors).Build() + controller := PeeringAcceptorController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + } + result := controller.requestsForPeeringTokens(tt.secret) + + require.Equal(t, tt.result, result) + }) + } +} diff --git a/control-plane/connect-inject/peering_dialer_controller.go b/control-plane/connect-inject/peering_dialer_controller.go index 5ca50c8442..6383595f2b 100644 --- a/control-plane/connect-inject/peering_dialer_controller.go +++ b/control-plane/connect-inject/peering_dialer_controller.go @@ -14,8 +14,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) // PeeringDialerController reconciles a PeeringDialer object. @@ -231,7 +236,11 @@ func (r *PeeringDialerController) getSecret(ctx context.Context, name string, na func (r *PeeringDialerController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&consulv1alpha1.PeeringDialer{}). - Complete(r) + Watches( + &source.Kind{Type: &corev1.Secret{}}, + handler.EnqueueRequestsFromMapFunc(r.requestsForPeeringTokens), + builder.WithPredicates(predicate.NewPredicateFuncs(r.filterPeeringDialers)), + ).Complete(r) } // establishPeering is a helper function that calls the Consul api to generate a token for the peer. @@ -257,3 +266,34 @@ func (r *PeeringDialerController) deletePeering(ctx context.Context, peerName st } return nil } + +func (r *PeeringDialerController) requestsForPeeringTokens(object client.Object) []reconcile.Request { + r.Log.Info("received update for Peering Token Secret", "name", object.GetName()) + + // Get the list of all dialers. + var dialerList consulv1alpha1.PeeringDialerList + if err := r.Client.List(r.Context, &dialerList); err != nil { + r.Log.Error(err, "failed to list endpoints") + return []ctrl.Request{} + } + for _, dialer := range dialerList.Items { + if dialer.Secret().Backend == "kubernetes" { + if dialer.Secret().Name == object.GetName() && dialer.Namespace == object.GetNamespace() { + return []ctrl.Request{{NamespacedName: types.NamespacedName{Namespace: dialer.Namespace, Name: dialer.Name}}} + } + } + } + return []ctrl.Request{} +} + +func (r *PeeringDialerController) filterPeeringDialers(object client.Object) bool { + secretLabels := object.GetLabels() + isPeeringToken, ok := secretLabels[labelPeeringToken] + if !ok { + return false + } + if isPeeringToken == "true" { + return true + } + return false +} diff --git a/control-plane/connect-inject/peering_dialer_controller_test.go b/control-plane/connect-inject/peering_dialer_controller_test.go index 4b453fce94..85d484a0a1 100644 --- a/control-plane/connect-inject/peering_dialer_controller_test.go +++ b/control-plane/connect-inject/peering_dialer_controller_test.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // TestReconcileCreateUpdatePeeringDialer creates a peering dialer. @@ -781,3 +782,218 @@ func TestDialerUpdateStatusError(t *testing.T) { }) } } + +func TestDialer_FilterPeeringDialers(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + result bool + }{ + "returns true if label is set to true": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "true", + }, + }, + }, + result: true, + }, + "returns false if label is set to false": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "false", + }, + }, + }, + result: false, + }, + "returns false if label is set to a non-true value": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "foo", + }, + }, + }, + result: false, + }, + "returns false if label is not set": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + result: false, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + controller := PeeringDialerController{} + result := controller.filterPeeringDialers(tt.secret) + require.Equal(t, tt.result, result) + }) + } +} + +func TestDialer_RequestsForPeeringTokens(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + dialers v1alpha1.PeeringDialerList + result []reconcile.Request + }{ + "secret matches existing acceptor": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + dialers: v1alpha1.PeeringDialerList{ + Items: []v1alpha1.PeeringDialer{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering", + }, + }, + }, + }, + "does not match if backend is not kubernetes": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + dialers: v1alpha1.PeeringDialerList{ + Items: []v1alpha1.PeeringDialer{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "vault", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{}, + }, + "only matches with the correct acceptor": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + dialers: v1alpha1.PeeringDialerList{ + Items: []v1alpha1.PeeringDialer{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-1", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-2", + Namespace: "test-2", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-3", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test-2", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering-1", + }, + }, + }, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + s := scheme.Scheme + s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.PeeringDialer{}, &v1alpha1.PeeringDialerList{}) + fakeClient := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(tt.secret, &tt.dialers).Build() + controller := PeeringDialerController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + } + result := controller.requestsForPeeringTokens(tt.secret) + + require.Equal(t, tt.result, result) + }) + } +}