From 3773ed45dacac414be0c53f30f6d2db07018d642 Mon Sep 17 00:00:00 2001 From: Ashwin Venkatesh Date: Wed, 15 Jun 2022 13:31:57 -0400 Subject: [PATCH] Add watchers for Peering Token secrets for the peering controllers - When a Kubernetes secret that has a label indicating it is a peering token secret, the controllers watch those secrets and updated to those secrets re-enqueues the resource that is associated with that peering token secret. The status is used to determine the Peering Acceptor that is re-enqueued while the spec is used to determing the Peering Dialer that gets re-enqueued. This is because the acceptor is responsible for creating the secret and hence metaphorically owns the secret described in it's status. OTOH the dialer should respond to changes in the secret described in it's spec. This is only supported for secrets with a Kubernetes backend. --- control-plane/connect-inject/annotations.go | 4 + .../peering_acceptor_controller.go | 51 +++- .../peering_acceptor_controller_test.go | 276 ++++++++++++++++++ .../peering_dialer_controller.go | 48 ++- .../peering_dialer_controller_test.go | 274 +++++++++++++++++ 5 files changed, 651 insertions(+), 2 deletions(-) diff --git a/control-plane/connect-inject/annotations.go b/control-plane/connect-inject/annotations.go index f615637470..1c3c9420e4 100644 --- a/control-plane/connect-inject/annotations.go +++ b/control-plane/connect-inject/annotations.go @@ -146,6 +146,10 @@ const ( // registered with Consul. labelServiceIgnore = "consul.hashicorp.com/service-ignore" + // labelPeeringToken is a label that can be added to a secret to allow it to be watched + // by the peering controllers. + labelPeeringToken = "consul.hashicorp.com/peering-token" + // injected is used as the annotation value for annotationInjected. injected = "injected" diff --git a/control-plane/connect-inject/peering_acceptor_controller.go b/control-plane/connect-inject/peering_acceptor_controller.go index ca14b2f89c..8830546aed 100644 --- a/control-plane/connect-inject/peering_acceptor_controller.go +++ b/control-plane/connect-inject/peering_acceptor_controller.go @@ -14,8 +14,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) // PeeringAcceptorController reconciles a PeeringAcceptor object. @@ -318,7 +323,11 @@ func (r *PeeringAcceptorController) deleteK8sSecret(ctx context.Context, accepto func (r *PeeringAcceptorController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&consulv1alpha1.PeeringAcceptor{}). - Complete(r) + Watches( + &source.Kind{Type: &corev1.Secret{}}, + handler.EnqueueRequestsFromMapFunc(r.requestsForPeeringTokens), + builder.WithPredicates(predicate.NewPredicateFuncs(r.filterPeeringAcceptors)), + ).Complete(r) } // generateToken is a helper function that calls the Consul api to generate a token for the peer. @@ -344,12 +353,52 @@ func (r *PeeringAcceptorController) deletePeering(ctx context.Context, peerName return nil } +// requestsForPeeringTokens creates a slice of requests for the peering acceptor controller. +// It enqueues a request for each acceptor that needs to be reconciled. It iterates through +// the list of acceptors and creates a request for the acceptor that has the same secret as it's +// secretRef and that of the updated secret that is being watched. +// We compare it to the secret in the status as the resource has created the secret. +func (r *PeeringAcceptorController) requestsForPeeringTokens(object client.Object) []reconcile.Request { + r.Log.Info("received update for Peering Token Secret", "name", object.GetName(), "namespace", object.GetNamespace()) + + // Get the list of all acceptors. + var acceptorList consulv1alpha1.PeeringAcceptorList + if err := r.Client.List(r.Context, &acceptorList); err != nil { + r.Log.Error(err, "failed to list Peering Acceptors") + return []ctrl.Request{} + } + for _, acceptor := range acceptorList.Items { + if acceptor.SecretRef().Backend == "kubernetes" { + if acceptor.SecretRef().Name == object.GetName() && acceptor.Namespace == object.GetNamespace() { + return []ctrl.Request{{NamespacedName: types.NamespacedName{Namespace: acceptor.Namespace, Name: acceptor.Name}}} + } + } + } + return []ctrl.Request{} +} + +// filterPeeringAcceptors receives meta and object information for Kubernetes resources that are being watched, +// which in this case are Secrets. It only returns true if the Secret is a Peering Token Secret. It reads the labels +// from the meta of the resource and uses the values of the "consul.hashicorp.com/peering-token" label to validate that +// the Secret is a Peering Token Secret. +func (r *PeeringAcceptorController) filterPeeringAcceptors(object client.Object) bool { + secretLabels := object.GetLabels() + isPeeringToken, ok := secretLabels[labelPeeringToken] + if !ok { + return false + } + return isPeeringToken == "true" +} + // createSecret is a helper function that creates a corev1.Secret when provided inputs. func createSecret(name, namespace, key, value string) *corev1.Secret { secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + Labels: map[string]string{ + labelPeeringToken: "true", + }, }, Data: map[string][]byte{ key: []byte(value), diff --git a/control-plane/connect-inject/peering_acceptor_controller_test.go b/control-plane/connect-inject/peering_acceptor_controller_test.go index 65ccfba0ad..1f53bcc516 100644 --- a/control-plane/connect-inject/peering_acceptor_controller_test.go +++ b/control-plane/connect-inject/peering_acceptor_controller_test.go @@ -21,6 +21,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // TestReconcileCreateUpdatePeeringAcceptor creates a peering acceptor. @@ -396,6 +397,8 @@ func TestReconcileCreateUpdatePeeringAcceptor(t *testing.T) { require.NoError(t, err) expSecrets := tt.expectedK8sSecrets() require.Equal(t, expSecrets[0].Name, createdSecret.Name) + require.Contains(t, createdSecret.Labels, labelPeeringToken) + require.Equal(t, createdSecret.Labels[labelPeeringToken], "true") // This assertion needs to be on StringData rather than Data because in the fake K8s client the contents are // stored in StringData if that's how the secret was initialized in the fake client. In a real cluster, this // StringData is an input only field, and shouldn't be read from. @@ -943,3 +946,276 @@ func TestAcceptorUpdateStatusError(t *testing.T) { }) } } + +func TestAcceptor_FilterPeeringAcceptor(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + result bool + }{ + "returns true if label is set to true": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "true", + }, + }, + }, + result: true, + }, + "returns false if label is set to false": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "false", + }, + }, + }, + result: false, + }, + "returns false if label is set to a non-true value": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "foo", + }, + }, + }, + result: false, + }, + "returns false if label is not set": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + result: false, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + controller := PeeringAcceptorController{} + result := controller.filterPeeringAcceptors(tt.secret) + require.Equal(t, tt.result, result) + }) + } +} + +func TestAcceptor_RequestsForPeeringTokens(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + acceptors v1alpha1.PeeringAcceptorList + result []reconcile.Request + }{ + "secret matches existing acceptor": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + acceptors: v1alpha1.PeeringAcceptorList{ + Items: []v1alpha1.PeeringAcceptor{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering", + }, + }, + }, + }, + "does not match if backend is not kubernetes": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + acceptors: v1alpha1.PeeringAcceptorList{ + Items: []v1alpha1.PeeringAcceptor{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "vault", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{}, + }, + "only matches with the correct acceptor": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + acceptors: v1alpha1.PeeringAcceptorList{ + Items: []v1alpha1.PeeringAcceptor{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-1", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-2", + Namespace: "test-2", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-3", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test-2", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering-1", + }, + }, + }, + }, + "can match with zero acceptors": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + acceptors: v1alpha1.PeeringAcceptorList{ + Items: []v1alpha1.PeeringAcceptor{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-1", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "fest", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-2", + Namespace: "test-2", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-3", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test-2", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{}, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + s := scheme.Scheme + s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.PeeringAcceptor{}, &v1alpha1.PeeringAcceptorList{}) + fakeClient := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(tt.secret, &tt.acceptors).Build() + controller := PeeringAcceptorController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + } + result := controller.requestsForPeeringTokens(tt.secret) + + require.Equal(t, tt.result, result) + }) + } +} diff --git a/control-plane/connect-inject/peering_dialer_controller.go b/control-plane/connect-inject/peering_dialer_controller.go index 5ca50c8442..1a62ae1b91 100644 --- a/control-plane/connect-inject/peering_dialer_controller.go +++ b/control-plane/connect-inject/peering_dialer_controller.go @@ -14,8 +14,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) // PeeringDialerController reconciles a PeeringDialer object. @@ -231,7 +236,11 @@ func (r *PeeringDialerController) getSecret(ctx context.Context, name string, na func (r *PeeringDialerController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&consulv1alpha1.PeeringDialer{}). - Complete(r) + Watches( + &source.Kind{Type: &corev1.Secret{}}, + handler.EnqueueRequestsFromMapFunc(r.requestsForPeeringTokens), + builder.WithPredicates(predicate.NewPredicateFuncs(r.filterPeeringDialers)), + ).Complete(r) } // establishPeering is a helper function that calls the Consul api to generate a token for the peer. @@ -257,3 +266,40 @@ func (r *PeeringDialerController) deletePeering(ctx context.Context, peerName st } return nil } + +// requestsForPeeringTokens creates a slice of requests for the peering dialer controller. +// It enqueues a request for each dialer that needs to be reconciled. It iterates through +// the list of dialers and creates a request for the dialer that has the same secret as it's +// secret and that of the updated secret that is being watched. +// We compare it to the secret in the spec as the resource is dependent on the secret. +func (r *PeeringDialerController) requestsForPeeringTokens(object client.Object) []reconcile.Request { + r.Log.Info("received update for Peering Token Secret", "name", object.GetName(), "namespace", object.GetNamespace()) + + // Get the list of all dialers. + var dialerList consulv1alpha1.PeeringDialerList + if err := r.Client.List(r.Context, &dialerList); err != nil { + r.Log.Error(err, "failed to list PeeringDialers") + return []ctrl.Request{} + } + for _, dialer := range dialerList.Items { + if dialer.Secret().Backend == "kubernetes" { + if dialer.Secret().Name == object.GetName() && dialer.Namespace == object.GetNamespace() { + return []ctrl.Request{{NamespacedName: types.NamespacedName{Namespace: dialer.Namespace, Name: dialer.Name}}} + } + } + } + return []ctrl.Request{} +} + +// filterPeeringAcceptors receives meta and object information for Kubernetes resources that are being watched, +// which in this case are Secrets. It only returns true if the Secret is a Peering Token Secret. It reads the labels +// from the meta of the resource and uses the values of the "consul.hashicorp.com/peering-token" label to validate that +// the Secret is a Peering Token Secret. +func (r *PeeringDialerController) filterPeeringDialers(object client.Object) bool { + secretLabels := object.GetLabels() + isPeeringToken, ok := secretLabels[labelPeeringToken] + if !ok { + return false + } + return isPeeringToken == "true" +} diff --git a/control-plane/connect-inject/peering_dialer_controller_test.go b/control-plane/connect-inject/peering_dialer_controller_test.go index 4b453fce94..4715840500 100644 --- a/control-plane/connect-inject/peering_dialer_controller_test.go +++ b/control-plane/connect-inject/peering_dialer_controller_test.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // TestReconcileCreateUpdatePeeringDialer creates a peering dialer. @@ -781,3 +782,276 @@ func TestDialerUpdateStatusError(t *testing.T) { }) } } + +func TestDialer_FilterPeeringDialers(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + result bool + }{ + "returns true if label is set to true": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "true", + }, + }, + }, + result: true, + }, + "returns false if label is set to false": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "false", + }, + }, + }, + result: false, + }, + "returns false if label is set to a non-true value": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "foo", + }, + }, + }, + result: false, + }, + "returns false if label is not set": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + result: false, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + controller := PeeringDialerController{} + result := controller.filterPeeringDialers(tt.secret) + require.Equal(t, tt.result, result) + }) + } +} + +func TestDialer_RequestsForPeeringTokens(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + dialers v1alpha1.PeeringDialerList + result []reconcile.Request + }{ + "secret matches existing acceptor": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + dialers: v1alpha1.PeeringDialerList{ + Items: []v1alpha1.PeeringDialer{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering", + }, + }, + }, + }, + "does not match if backend is not kubernetes": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + dialers: v1alpha1.PeeringDialerList{ + Items: []v1alpha1.PeeringDialer{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "vault", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{}, + }, + "only matches with the correct dialer": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + dialers: v1alpha1.PeeringDialerList{ + Items: []v1alpha1.PeeringDialer{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-1", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-2", + Namespace: "test-2", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-3", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test-2", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering-1", + }, + }, + }, + }, + "can match with zero dialer": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + dialers: v1alpha1.PeeringDialerList{ + Items: []v1alpha1.PeeringDialer{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-1", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "fest", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-2", + Namespace: "test-2", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-3", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test-2", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{}, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + s := scheme.Scheme + s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.PeeringDialer{}, &v1alpha1.PeeringDialerList{}) + fakeClient := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(tt.secret, &tt.dialers).Build() + controller := PeeringDialerController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + } + result := controller.requestsForPeeringTokens(tt.secret) + + require.Equal(t, tt.result, result) + }) + } +}