diff --git a/control-plane/connect-inject/annotations.go b/control-plane/connect-inject/annotations.go index f615637470..1c3c9420e4 100644 --- a/control-plane/connect-inject/annotations.go +++ b/control-plane/connect-inject/annotations.go @@ -146,6 +146,10 @@ const ( // registered with Consul. labelServiceIgnore = "consul.hashicorp.com/service-ignore" + // labelPeeringToken is a label that can be added to a secret to allow it to be watched + // by the peering controllers. + labelPeeringToken = "consul.hashicorp.com/peering-token" + // injected is used as the annotation value for annotationInjected. injected = "injected" diff --git a/control-plane/connect-inject/peering_acceptor_controller.go b/control-plane/connect-inject/peering_acceptor_controller.go index ca14b2f89c..cdef646fbb 100644 --- a/control-plane/connect-inject/peering_acceptor_controller.go +++ b/control-plane/connect-inject/peering_acceptor_controller.go @@ -14,8 +14,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) // PeeringAcceptorController reconciles a PeeringAcceptor object. @@ -318,7 +323,11 @@ func (r *PeeringAcceptorController) deleteK8sSecret(ctx context.Context, accepto func (r *PeeringAcceptorController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&consulv1alpha1.PeeringAcceptor{}). - Complete(r) + Watches( + &source.Kind{Type: &corev1.Secret{}}, + handler.EnqueueRequestsFromMapFunc(r.requestsForPeeringTokens), + builder.WithPredicates(predicate.NewPredicateFuncs(r.filterPeeringAcceptors)), + ).Complete(r) } // generateToken is a helper function that calls the Consul api to generate a token for the peer. @@ -344,12 +353,56 @@ func (r *PeeringAcceptorController) deletePeering(ctx context.Context, peerName return nil } +// requestsForPeeringTokens creates a slice of requests for the peering acceptor controller. +// It enqueues a request for each acceptor that needs to be reconciled. It iterates through +// the list of endpoints and creates a request for those endpoints that have an address that +// are on the same node as the new Consul Agent pod. It receives a Pod Object which is a +// Consul Agent that has been filtered by filterAgentPods and only enqueues endpoints +// for client agent pods where the Ready condition is true. +func (r *PeeringAcceptorController) requestsForPeeringTokens(object client.Object) []reconcile.Request { + r.Log.Info("received update for Peering Token Secret", "name", object.GetName()) + + // Get the list of all acceptors. + var acceptorList consulv1alpha1.PeeringAcceptorList + if err := r.Client.List(r.Context, &acceptorList); err != nil { + r.Log.Error(err, "failed to list endpoints") + return []ctrl.Request{} + } + for _, acceptor := range acceptorList.Items { + if acceptor.SecretRef().Backend == "kubernetes" { + if acceptor.SecretRef().Name == object.GetName() && acceptor.Namespace == object.GetNamespace() { + return []ctrl.Request{{NamespacedName: types.NamespacedName{Namespace: acceptor.Namespace, Name: acceptor.Name}}} + } + } + } + return []ctrl.Request{} +} + +// filterPeeringAcceptors receives meta and object information for Kubernetes resources that are being watched, +// which in this case are Secrets. It only returns true if the Secret is a Peering Token Secret. It reads the labels +// from the meta of the resource and uses the values of the "consul.hashicorp.com/peering-token" label to validate that +// the Secret is a Peering Token Secret. +func (r *PeeringAcceptorController) filterPeeringAcceptors(object client.Object) bool { + secretLabels := object.GetLabels() + isPeeringToken, ok := secretLabels[labelPeeringToken] + if !ok { + return false + } + if isPeeringToken == "true" { + return true + } + return false +} + // createSecret is a helper function that creates a corev1.Secret when provided inputs. func createSecret(name, namespace, key, value string) *corev1.Secret { secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + Labels: map[string]string{ + labelPeeringToken: "true", + }, }, Data: map[string][]byte{ key: []byte(value), diff --git a/control-plane/connect-inject/peering_acceptor_controller_test.go b/control-plane/connect-inject/peering_acceptor_controller_test.go index 65ccfba0ad..1861c96e48 100644 --- a/control-plane/connect-inject/peering_acceptor_controller_test.go +++ b/control-plane/connect-inject/peering_acceptor_controller_test.go @@ -21,6 +21,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // TestReconcileCreateUpdatePeeringAcceptor creates a peering acceptor. @@ -396,6 +397,8 @@ func TestReconcileCreateUpdatePeeringAcceptor(t *testing.T) { require.NoError(t, err) expSecrets := tt.expectedK8sSecrets() require.Equal(t, expSecrets[0].Name, createdSecret.Name) + require.Contains(t, createdSecret.Labels, labelPeeringToken) + require.Equal(t, createdSecret.Labels[labelPeeringToken], "true") // This assertion needs to be on StringData rather than Data because in the fake K8s client the contents are // stored in StringData if that's how the secret was initialized in the fake client. In a real cluster, this // StringData is an input only field, and shouldn't be read from. @@ -943,3 +946,218 @@ func TestAcceptorUpdateStatusError(t *testing.T) { }) } } + +func TestAcceptor_FilterPeeringAcceptor(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + result bool + }{ + "returns true if label is set to true": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "true", + }, + }, + }, + result: true, + }, + "returns false if label is set to false": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "false", + }, + }, + }, + result: false, + }, + "returns false if label is set to a non-true value": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "foo", + }, + }, + }, + result: false, + }, + "returns false if label is not set": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + result: false, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + controller := PeeringAcceptorController{} + result := controller.filterPeeringAcceptors(tt.secret) + require.Equal(t, tt.result, result) + }) + } +} + +func TestAcceptor_RequestsForPeeringTokens(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + acceptors v1alpha1.PeeringAcceptorList + result []reconcile.Request + }{ + "secret matches existing acceptor": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + acceptors: v1alpha1.PeeringAcceptorList{ + Items: []v1alpha1.PeeringAcceptor{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering", + }, + }, + }, + }, + "does not match if backend is not kubernetes": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + acceptors: v1alpha1.PeeringAcceptorList{ + Items: []v1alpha1.PeeringAcceptor{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "vault", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{}, + }, + "only matches with the correct acceptor": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + acceptors: v1alpha1.PeeringAcceptorList{ + Items: []v1alpha1.PeeringAcceptor{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-1", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-2", + Namespace: "test-2", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-3", + Namespace: "test", + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "test-2", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering-1", + }, + }, + }, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + s := scheme.Scheme + s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.PeeringAcceptor{}, &v1alpha1.PeeringAcceptorList{}) + fakeClient := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(tt.secret, &tt.acceptors).Build() + controller := PeeringAcceptorController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + } + result := controller.requestsForPeeringTokens(tt.secret) + + require.Equal(t, tt.result, result) + }) + } +} diff --git a/control-plane/connect-inject/peering_dialer_controller.go b/control-plane/connect-inject/peering_dialer_controller.go index 5ca50c8442..6383595f2b 100644 --- a/control-plane/connect-inject/peering_dialer_controller.go +++ b/control-plane/connect-inject/peering_dialer_controller.go @@ -14,8 +14,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) // PeeringDialerController reconciles a PeeringDialer object. @@ -231,7 +236,11 @@ func (r *PeeringDialerController) getSecret(ctx context.Context, name string, na func (r *PeeringDialerController) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&consulv1alpha1.PeeringDialer{}). - Complete(r) + Watches( + &source.Kind{Type: &corev1.Secret{}}, + handler.EnqueueRequestsFromMapFunc(r.requestsForPeeringTokens), + builder.WithPredicates(predicate.NewPredicateFuncs(r.filterPeeringDialers)), + ).Complete(r) } // establishPeering is a helper function that calls the Consul api to generate a token for the peer. @@ -257,3 +266,34 @@ func (r *PeeringDialerController) deletePeering(ctx context.Context, peerName st } return nil } + +func (r *PeeringDialerController) requestsForPeeringTokens(object client.Object) []reconcile.Request { + r.Log.Info("received update for Peering Token Secret", "name", object.GetName()) + + // Get the list of all dialers. + var dialerList consulv1alpha1.PeeringDialerList + if err := r.Client.List(r.Context, &dialerList); err != nil { + r.Log.Error(err, "failed to list endpoints") + return []ctrl.Request{} + } + for _, dialer := range dialerList.Items { + if dialer.Secret().Backend == "kubernetes" { + if dialer.Secret().Name == object.GetName() && dialer.Namespace == object.GetNamespace() { + return []ctrl.Request{{NamespacedName: types.NamespacedName{Namespace: dialer.Namespace, Name: dialer.Name}}} + } + } + } + return []ctrl.Request{} +} + +func (r *PeeringDialerController) filterPeeringDialers(object client.Object) bool { + secretLabels := object.GetLabels() + isPeeringToken, ok := secretLabels[labelPeeringToken] + if !ok { + return false + } + if isPeeringToken == "true" { + return true + } + return false +} diff --git a/control-plane/connect-inject/peering_dialer_controller_test.go b/control-plane/connect-inject/peering_dialer_controller_test.go index 4b453fce94..85d484a0a1 100644 --- a/control-plane/connect-inject/peering_dialer_controller_test.go +++ b/control-plane/connect-inject/peering_dialer_controller_test.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // TestReconcileCreateUpdatePeeringDialer creates a peering dialer. @@ -781,3 +782,218 @@ func TestDialerUpdateStatusError(t *testing.T) { }) } } + +func TestDialer_FilterPeeringDialers(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + result bool + }{ + "returns true if label is set to true": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "true", + }, + }, + }, + result: true, + }, + "returns false if label is set to false": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "false", + }, + }, + }, + result: false, + }, + "returns false if label is set to a non-true value": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + Labels: map[string]string{ + labelPeeringToken: "foo", + }, + }, + }, + result: false, + }, + "returns false if label is not set": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + result: false, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + controller := PeeringDialerController{} + result := controller.filterPeeringDialers(tt.secret) + require.Equal(t, tt.result, result) + }) + } +} + +func TestDialer_RequestsForPeeringTokens(t *testing.T) { + t.Parallel() + cases := map[string]struct { + secret *corev1.Secret + dialers v1alpha1.PeeringDialerList + result []reconcile.Request + }{ + "secret matches existing acceptor": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + dialers: v1alpha1.PeeringDialerList{ + Items: []v1alpha1.PeeringDialer{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering", + }, + }, + }, + }, + "does not match if backend is not kubernetes": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + dialers: v1alpha1.PeeringDialerList{ + Items: []v1alpha1.PeeringDialer{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "vault", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{}, + }, + "only matches with the correct acceptor": { + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + dialers: v1alpha1.PeeringDialerList{ + Items: []v1alpha1.PeeringDialer{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-1", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-2", + Namespace: "test-2", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "peering-3", + Namespace: "test", + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "test-2", + Key: "test", + Backend: "kubernetes", + }, + }, + }, + }, + }, + }, + result: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "peering-1", + }, + }, + }, + }, + } + + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + s := scheme.Scheme + s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.PeeringDialer{}, &v1alpha1.PeeringDialerList{}) + fakeClient := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(tt.secret, &tt.dialers).Build() + controller := PeeringDialerController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + } + result := controller.requestsForPeeringTokens(tt.secret) + + require.Equal(t, tt.result, result) + }) + } +}